You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/11/09 05:04:11 UTC

[iceberg] branch master updated: Spark: Support vectorized reads with position deletes (#3287)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new df4a0e5  Spark: Support vectorized reads with position deletes (#3287)
df4a0e5 is described below

commit df4a0e5c2ca78ad9d3edfbd4ad3bf751f88a6bc2
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Mon Nov 8 21:03:58 2021 -0800

    Spark: Support vectorized reads with position deletes (#3287)
---
 .../iceberg/arrow/vectorized/BaseBatchReader.java  |   2 +-
 .../arrow/vectorized/VectorizedReaderBuilder.java  |   4 +
 build.gradle                                       |   1 +
 .../iceberg/deletes/BitmapPositionDeleteIndex.java |  45 +++++
 .../java/org/apache/iceberg/deletes/Deletes.java   |  18 ++
 .../iceberg/deletes/PositionDeleteIndex.java       |  42 ++++
 .../org/apache/iceberg/util/TableScanUtil.java     |  10 +
 .../java/org/apache/iceberg/data/DeleteFilter.java |  19 ++
 .../org/apache/iceberg/data/DeleteReadTests.java   |  39 +++-
 spark/v2.4/build.gradle                            |   2 +
 spark/v3.0/build.gradle                            |   3 +
 spark/v3.1/build.gradle                            |   3 +
 spark/v3.2/build.gradle                            |   3 +
 .../spark/source/IcebergSourceBenchmark.java       |   5 +
 .../spark/source/IcebergSourceDeleteBenchmark.java | 211 +++++++++++++++++++++
 .../IcebergSourceParquetDeleteBenchmark.java       |  63 ++++++
 ...ebergSourceParquetMultiDeleteFileBenchmark.java |  60 ++++++
 ...gSourceParquetWithUnrelatedDeleteBenchmark.java |  62 ++++++
 .../data/vectorized/ColumnVectorWithFilter.java    | 105 ++++++++++
 .../spark/data/vectorized/ColumnarBatchReader.java |  82 +++++++-
 .../data/vectorized/IcebergArrowColumnVector.java  |   8 +
 .../vectorized/VectorizedSparkParquetReaders.java  |  40 ++++
 .../iceberg/spark/source/BaseDataReader.java       |   4 +
 .../iceberg/spark/source/BatchDataReader.java      |  32 +++-
 .../iceberg/spark/source/SparkBatchScan.java       |   9 +-
 .../data/TestSparkParquetReadMetadataColumns.java  |  61 ++++++
 .../spark/source/TestSparkReaderDeletes.java       |  49 ++++-
 versions.props                                     |   1 +
 28 files changed, 972 insertions(+), 11 deletions(-)

diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
index 10f4eb0..76b5fd5 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
@@ -42,7 +42,7 @@ public abstract class BaseBatchReader<T> implements VectorizedReader<T> {
   }
 
   @Override
-  public final void setRowGroupInfo(
+  public void setRowGroupInfo(
       PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
     for (VectorizedArrowReader reader : readers) {
       if (reader != null) {
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
index 2bace1e..ad69d5e 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
@@ -94,6 +94,10 @@ public class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedRea
         reorderedFields.add(VectorizedArrowReader.nulls());
       }
     }
+    return vectorizedReader(reorderedFields);
+  }
+
+  protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>> reorderedFields) {
     return readerFactory.apply(reorderedFields);
   }
 
diff --git a/build.gradle b/build.gradle
index e0097e9..3838874 100644
--- a/build.gradle
+++ b/build.gradle
@@ -215,6 +215,7 @@ project(':iceberg-core') {
     implementation "com.fasterxml.jackson.core:jackson-databind"
     implementation "com.fasterxml.jackson.core:jackson-core"
     implementation "com.github.ben-manes.caffeine:caffeine"
+    implementation "org.roaringbitmap:RoaringBitmap"
     compileOnly("org.apache.hadoop:hadoop-client") {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
diff --git a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
new file mode 100644
index 0000000..e7675c3
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+class BitmapPositionDeleteIndex implements PositionDeleteIndex {
+  private final Roaring64Bitmap roaring64Bitmap;
+
+  BitmapPositionDeleteIndex() {
+    roaring64Bitmap = new Roaring64Bitmap();
+  }
+
+  @Override
+  public void delete(long position) {
+    roaring64Bitmap.add(position);
+  }
+
+  @Override
+  public void delete(long posStart, long posEnd) {
+    roaring64Bitmap.add(posStart, posEnd);
+  }
+
+  @Override
+  public boolean deleted(long position) {
+    return roaring64Bitmap.contains(position);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index 62154f7..41c7793 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -107,6 +107,24 @@ public class Deletes {
     }
   }
 
+  public static <T extends StructLike> PositionDeleteIndex toPositionBitmap(CharSequence dataLocation,
+                                                                            List<CloseableIterable<T>> deleteFiles) {
+    DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocation);
+    List<CloseableIterable<Long>> positions = Lists.transform(deleteFiles, deletes ->
+        CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row)));
+    return toPositionBitmap(CloseableIterable.concat(positions));
+  }
+
+  public static PositionDeleteIndex toPositionBitmap(CloseableIterable<Long> posDeletes) {
+    try (CloseableIterable<Long> deletes = posDeletes) {
+      PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex();
+      deletes.forEach(positionDeleteIndex::delete);
+      return positionDeleteIndex;
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close position delete source", e);
+    }
+  }
+
   public static <T> CloseableIterable<T> streamingFilter(CloseableIterable<T> rows,
                                                          Function<T, Long> rowToPosition,
                                                          CloseableIterable<Long> posDeletes) {
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
new file mode 100644
index 0000000..73ef397
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+public interface PositionDeleteIndex {
+  /**
+   * Set a deleted row position.
+   * @param position the deleted row position
+   */
+  void delete(long position);
+
+  /**
+   * Set a range of deleted row positions.
+   * @param posStart inclusive beginning of position range
+   * @param posEnd exclusive ending of position range
+   */
+  void delete(long posStart, long posEnd);
+
+  /**
+   * Checks whether a row at the position is deleted.
+   * @param position deleted row position
+   * @return whether the position is deleted
+   */
+  boolean deleted(long position);
+}
diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
index e90cb8f..d8d2d68 100644
--- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
@@ -23,6 +23,7 @@ import java.util.function.Function;
 import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -37,6 +38,15 @@ public class TableScanUtil {
     return task.files().stream().anyMatch(TableScanUtil::hasDeletes);
   }
 
+  /**
+   * This is temporarily introduced since we plan to support pos-delete vectorized read first, then get to the
+   * equality-delete support. We will remove this method once both are supported.
+   */
+  public static boolean hasEqDeletes(CombinedScanTask task) {
+    return task.files().stream().anyMatch(
+        t -> t.deletes().stream().anyMatch(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES)));
+  }
+
   public static boolean hasDeletes(FileScanTask task) {
     return !task.deletes().isEmpty();
   }
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index c4123d5..cf26172 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.data.avro.DataReader;
 import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
@@ -69,6 +70,8 @@ public abstract class DeleteFilter<T> {
   private final Schema requiredSchema;
   private final Accessor<StructLike> posAccessor;
 
+  private PositionDeleteIndex deleteRowPositions = null;
+
   protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
     this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD;
     this.dataFile = task.file();
@@ -98,6 +101,10 @@ public abstract class DeleteFilter<T> {
     return requiredSchema;
   }
 
+  public boolean hasPosDeletes() {
+    return !posDeletes.isEmpty();
+  }
+
   Accessor<StructLike> posAccessor() {
     return posAccessor;
   }
@@ -185,6 +192,18 @@ public abstract class DeleteFilter<T> {
     return remainingRowsFilter.filter(records);
   }
 
+  public PositionDeleteIndex deletedRowPositions() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    if (deleteRowPositions == null) {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      deleteRowPositions = Deletes.toPositionBitmap(dataFile.path(), deletes);
+    }
+    return deleteRowPositions;
+  }
+
   private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
     if (posDeletes.isEmpty()) {
       return records;
diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
index 69b0a57..75e1657 100644
--- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
+++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
@@ -75,9 +75,9 @@ public abstract class DeleteReadTests {
   protected String dateTableName = null;
   protected Table table = null;
   protected Table dateTable = null;
-  private List<Record> records = null;
+  protected List<Record> records = null;
   private List<Record> dateRecords = null;
-  private DataFile dataFile = null;
+  protected DataFile dataFile = null;
 
   @Before
   public void writeTestDataFile() throws IOException {
@@ -299,6 +299,39 @@ public abstract class DeleteReadTests {
   }
 
   @Test
+  public void testMultiplePosDeleteFiles() throws IOException {
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 0L), // id = 29
+        Pair.of(dataFile.path(), 3L) // id = 89
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0), deletes);
+
+    table.newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 6L) // id = 122
+    );
+
+    posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0), deletes);
+
+    table.newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122);
+    StructLikeSet actual = rowSet(tableName, table, "*");
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  @Test
   public void testMixedPositionAndEqualityDeletes() throws IOException {
     Schema dataSchema = table.schema().select("data");
     Record dataDelete = GenericRecord.create(dataSchema);
@@ -411,7 +444,7 @@ public abstract class DeleteReadTests {
     return set;
   }
 
-  private static StructLikeSet rowSetWithoutIds(Table table, List<Record> recordList, int... idsToRemove) {
+  protected static StructLikeSet rowSetWithoutIds(Table table, List<Record> recordList, int... idsToRemove) {
     Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     recordList.stream()
diff --git a/spark/v2.4/build.gradle b/spark/v2.4/build.gradle
index 0789f12..22f4abb 100644
--- a/spark/v2.4/build.gradle
+++ b/spark/v2.4/build.gradle
@@ -65,6 +65,7 @@ project(':iceberg-spark:iceberg-spark2') {
     compileOnly "org.apache.avro:avro"
     compileOnly("org.apache.spark:spark-hive_2.11") {
       exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.roaringbitmap'
     }
 
     implementation("org.apache.orc:orc-core::nohive") {
@@ -159,6 +160,7 @@ project(':iceberg-spark:iceberg-spark-runtime') {
     relocate 'org.apache.arrow', 'org.apache.iceberg.shaded.org.apache.arrow'
     relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch'
     relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
+    relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap'
 
     classifier null
   }
diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle
index 7df4c62..2ad0fb0 100644
--- a/spark/v3.0/build.gradle
+++ b/spark/v3.0/build.gradle
@@ -64,6 +64,7 @@ project(':iceberg-spark:iceberg-spark3') {
     compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.apache.arrow'
+      exclude group: 'org.roaringbitmap'
     }
 
     implementation("org.apache.orc:orc-core::nohive") {
@@ -134,6 +135,7 @@ project(":iceberg-spark:iceberg-spark3-extensions") {
     compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.apache.arrow'
+      exclude group: 'org.roaringbitmap'
     }
 
     testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
@@ -242,6 +244,7 @@ project(':iceberg-spark:iceberg-spark3-runtime') {
     relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
     // relocate Antlr runtime and related deps to shade Iceberg specific version
     relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4'
+    relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap'
 
     classifier null
   }
diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle
index 94befdf..510185a 100644
--- a/spark/v3.1/build.gradle
+++ b/spark/v3.1/build.gradle
@@ -64,6 +64,7 @@ project(':iceberg-spark:iceberg-spark-3.1') {
     compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.apache.arrow'
+      exclude group: 'org.roaringbitmap'
     }
 
     implementation("org.apache.orc:orc-core::nohive") {
@@ -134,6 +135,7 @@ project(":iceberg-spark:iceberg-spark-3.1-extensions") {
     compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.apache.arrow'
+      exclude group: 'org.roaringbitmap'
     }
 
     testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
@@ -242,6 +244,7 @@ project(':iceberg-spark:iceberg-spark-3.1-runtime') {
     relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
     // relocate Antlr runtime and related deps to shade Iceberg specific version
     relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4'
+    relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap'
 
     classifier null
   }
diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle
index 2a2ff62..8b0200f 100644
--- a/spark/v3.2/build.gradle
+++ b/spark/v3.2/build.gradle
@@ -66,6 +66,7 @@ project(':iceberg-spark:iceberg-spark-3.2') {
       exclude group: 'org.apache.arrow'
       // to make sure io.netty.buffer only comes from project(':iceberg-arrow')
       exclude group: 'io.netty', module: 'netty-buffer'
+      exclude group: 'org.roaringbitmap'
     }
 
     implementation("org.apache.orc:orc-core::nohive") {
@@ -138,6 +139,7 @@ project(":iceberg-spark:iceberg-spark-3.2-extensions") {
       exclude group: 'org.apache.arrow'
       // to make sure io.netty.buffer only comes from project(':iceberg-arrow')
       exclude group: 'io.netty', module: 'netty-buffer'
+      exclude group: 'org.roaringbitmap'
     }
 
     testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
@@ -244,6 +246,7 @@ project(':iceberg-spark:iceberg-spark-3.2-runtime') {
     relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
     // relocate Antlr runtime and related deps to shade Iceberg specific version
     relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4'
+    relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap'
 
     classifier null
   }
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
index 2be6a8c..ca1e92f 100644
--- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.UpdateProperties;
@@ -187,4 +188,8 @@ public abstract class IcebergSourceBenchmark {
       restoreProperties.commit();
     }
   }
+
+  protected FileFormat fileFormat() {
+    throw new UnsupportedOperationException("Unsupported file format");
+  }
 }
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java
new file mode 100644
index 0000000..d5a6db3
--- /dev/null
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+public abstract class IcebergSourceDeleteBenchmark extends IcebergSourceBenchmark {
+  private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceDeleteBenchmark.class);
+  private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024;
+
+  protected static final int NUM_FILES = 1;
+  protected static final int NUM_ROWS = 10 * 1000 * 1000;
+
+  @Setup
+  public void setupBenchmark() throws IOException {
+    setupSpark();
+    appendData();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readIceberg() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+      materialize(df);
+    });
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void readIcebergVectorized() {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+    tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true");
+    withTableProperties(tableProperties, () -> {
+      String tableLocation = table().location();
+      Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+      materialize(df);
+    });
+  }
+
+  protected abstract void appendData() throws IOException;
+
+  protected void writeData(int fileNum) {
+    Dataset<Row> df = spark().range(NUM_ROWS)
+        .withColumnRenamed("id", "longCol")
+        .withColumn("intCol", expr("CAST(MOD(longCol, 2147483647) AS INT)"))
+        .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+        .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+        .withColumn("dateCol", date_add(current_date(), fileNum))
+        .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+        .withColumn("stringCol", expr("CAST(dateCol AS STRING)"));
+    appendAsFile(df);
+  }
+
+  @Override
+  protected Table initTable() {
+    Schema schema = new Schema(
+        required(1, "longCol", Types.LongType.get()),
+        required(2, "intCol", Types.IntegerType.get()),
+        required(3, "floatCol", Types.FloatType.get()),
+        optional(4, "doubleCol", Types.DoubleType.get()),
+        optional(6, "dateCol", Types.DateType.get()),
+        optional(7, "timestampCol", Types.TimestampType.withZone()),
+        optional(8, "stringCol", Types.StringType.get()));
+    PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+    HadoopTables tables = new HadoopTables(hadoopConf());
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+    properties.put(TableProperties.FORMAT_VERSION, "2");
+    return tables.create(schema, partitionSpec, properties, newTableLocation());
+  }
+
+  @Override
+  protected Configuration initHadoopConf() {
+    return new Configuration();
+  }
+
+  protected void writePosDeletes(CharSequence path, long numRows, double percentage) throws IOException {
+    writePosDeletes(path, numRows, percentage, 1);
+  }
+
+  protected void writePosDeletes(CharSequence path, long numRows, double percentage,
+                                 int numDeleteFile) throws IOException {
+    writePosDeletesWithNoise(path, numRows, percentage, 0, numDeleteFile);
+  }
+
+  protected void writePosDeletesWithNoise(CharSequence path, long numRows, double percentage, int numNoise,
+                                          int numDeleteFile) throws IOException {
+    Set<Long> deletedPos = Sets.newHashSet();
+    while (deletedPos.size() < numRows * percentage) {
+      deletedPos.add(ThreadLocalRandom.current().nextLong(numRows));
+    }
+    LOG.info("pos delete row count: {}, num of delete files: {}", deletedPos.size(), numDeleteFile);
+
+    int partitionSize = (int) (numRows * percentage) / numDeleteFile;
+    Iterable<List<Long>> sets = Iterables.partition(deletedPos, partitionSize);
+    for (List<Long> item : sets) {
+      writePosDeletes(path, item, numNoise);
+    }
+  }
+
+  protected void writePosDeletes(CharSequence path, List<Long> deletedPos, int numNoise) throws IOException {
+    OutputFileFactory fileFactory = newFileFactory();
+    SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+        .dataFileFormat(fileFormat())
+        .build();
+
+    ClusteredPositionDeleteWriter<InternalRow> writer = new ClusteredPositionDeleteWriter<>(
+        writerFactory, fileFactory, table().io(),
+        fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+    PartitionSpec unpartitionedSpec = table().specs().get(0);
+
+    PositionDelete<InternalRow> positionDelete = PositionDelete.create();
+    try (ClusteredPositionDeleteWriter<InternalRow> closeableWriter = writer) {
+      for (Long pos : deletedPos) {
+        positionDelete.set(path, pos, null);
+        closeableWriter.write(positionDelete, unpartitionedSpec, null);
+        for (int i = 0; i < numNoise; i++) {
+          positionDelete.set(noisePath(path), pos, null);
+          closeableWriter.write(positionDelete, unpartitionedSpec, null);
+        }
+      }
+    }
+
+    RowDelta rowDelta = table().newRowDelta();
+    writer.result().deleteFiles().forEach(rowDelta::addDeletes);
+    rowDelta.validateDeletedFiles().commit();
+  }
+
+  private OutputFileFactory newFileFactory() {
+    return OutputFileFactory.builderFor(table(), 1, 1)
+        .format(fileFormat())
+        .build();
+  }
+
+  private CharSequence noisePath(CharSequence path) {
+    // assume the data file name would be something like "00000-0-30da64e0-56b5-4743-a11b-3188a1695bf7-00001.parquet"
+    // so the dataFileSuffixLen is the UUID string length + length of "-00001.parquet", which is 36 + 14 = 60. It's OK
+    // to be not accurate here.
+    int dataFileSuffixLen = 60;
+    UUID uuid = UUID.randomUUID();
+    if (path.length() > dataFileSuffixLen) {
+      return path.subSequence(0, dataFileSuffixLen) + uuid.toString();
+    } else {
+      return uuid.toString();
+    }
+  }
+}
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java
new file mode 100644
index 0000000..234c6c5
--- /dev/null
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import java.io.IOException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark;
+import org.openjdk.jmh.annotations.Param;
+
+/**
+ * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for
+ * Iceberg.
+ * <p>
+ * This class uses a dataset with a flat schema.
+ * To run this benchmark for spark-3.2:
+ * <code>
+ * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh
+ *   -PjmhIncludeRegex=IcebergSourceParquetDeleteBenchmark
+ *   -PjmhOutputPath=benchmark/iceberg-source-parquet-delete-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceParquetDeleteBenchmark extends IcebergSourceDeleteBenchmark {
+  @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"})
+  private double percentDeleteRow;
+
+  @Override
+  protected void appendData() throws IOException {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      writeData(fileNum);
+
+      if (percentDeleteRow > 0) {
+        // add pos-deletes
+        table().refresh();
+        for (DataFile file : table().currentSnapshot().addedFiles()) {
+          writePosDeletes(file.path(), NUM_ROWS, percentDeleteRow);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected FileFormat fileFormat() {
+    return FileFormat.PARQUET;
+  }
+}
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java
new file mode 100644
index 0000000..fb39331
--- /dev/null
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import java.io.IOException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark;
+import org.openjdk.jmh.annotations.Param;
+
+/**
+ * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for
+ * Iceberg.
+ * <p>
+ * This class uses a dataset with a flat schema.
+ * To run this benchmark for spark-3.2:
+ * <code>
+ * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh
+ *   -PjmhIncludeRegex=IcebergSourceParquetMultiDeleteFileBenchmark
+ *   -PjmhOutputPath=benchmark/iceberg-source-parquet-multi-delete-file-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceParquetMultiDeleteFileBenchmark extends IcebergSourceDeleteBenchmark {
+  @Param({"1", "2", "5", "10"})
+  private int numDeleteFile;
+
+  @Override
+  protected void appendData() throws IOException {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      writeData(fileNum);
+
+      table().refresh();
+      for (DataFile file : table().currentSnapshot().addedFiles()) {
+        writePosDeletes(file.path(), NUM_ROWS, 0.25, numDeleteFile);
+      }
+    }
+  }
+
+  @Override
+  protected FileFormat fileFormat() {
+    return FileFormat.PARQUET;
+  }
+}
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java
new file mode 100644
index 0000000..a06cadb
--- /dev/null
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import java.io.IOException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark;
+import org.openjdk.jmh.annotations.Param;
+
+/**
+ * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for
+ * Iceberg.
+ * <p>
+ * This class uses a dataset with a flat schema.
+ * To run this benchmark for spark-3.2:
+ * <code>
+ * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh
+ *   -PjmhIncludeRegex=IcebergSourceParquetWithUnrelatedDeleteBenchmark
+ *   -PjmhOutputPath=benchmark/iceberg-source-parquet-with-unrelated-delete-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceParquetWithUnrelatedDeleteBenchmark extends IcebergSourceDeleteBenchmark {
+  private static final double PERCENT_DELETE_ROW = 0.05;
+  @Param({"0", "0.05", "0.25", "0.5"})
+  private double percentUnrelatedDeletes;
+
+  @Override
+  protected void appendData() throws IOException {
+    for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+      writeData(fileNum);
+
+      table().refresh();
+      for (DataFile file : table().currentSnapshot().addedFiles()) {
+        writePosDeletesWithNoise(file.path(), NUM_ROWS, PERCENT_DELETE_ROW,
+            (int) (percentUnrelatedDeletes / PERCENT_DELETE_ROW), 1);
+      }
+    }
+  }
+
+  @Override
+  protected FileFormat fileFormat() {
+    return FileFormat.PARQUET;
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java
new file mode 100644
index 0000000..7804c02
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class ColumnVectorWithFilter extends IcebergArrowColumnVector {
+  private final int[] rowIdMapping;
+
+  public ColumnVectorWithFilter(VectorHolder holder, int[] rowIdMapping) {
+    super(holder);
+    this.rowIdMapping = rowIdMapping;
+  }
+
+  @Override
+  public boolean isNullAt(int rowId) {
+    return nullabilityHolder().isNullAt(rowIdMapping[rowId]) == 1;
+  }
+
+  @Override
+  public boolean getBoolean(int rowId) {
+    return accessor().getBoolean(rowIdMapping[rowId]);
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    return accessor().getInt(rowIdMapping[rowId]);
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    return accessor().getLong(rowIdMapping[rowId]);
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    return accessor().getFloat(rowIdMapping[rowId]);
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    return accessor().getDouble(rowIdMapping[rowId]);
+  }
+
+  @Override
+  public ColumnarArray getArray(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor().getArray(rowIdMapping[rowId]);
+  }
+
+  @Override
+  public Decimal getDecimal(int rowId, int precision, int scale) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor().getDecimal(rowIdMapping[rowId], precision, scale);
+  }
+
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor().getUTF8String(rowIdMapping[rowId]);
+  }
+
+  @Override
+  public byte[] getBinary(int rowId) {
+    if (isNullAt(rowId)) {
+      return null;
+    }
+    return accessor().getBinary(rowIdMapping[rowId]);
+  }
+
+  public static ColumnVector forHolder(VectorHolder holder, int[] rowIdMapping, int numRows) {
+    return holder.isDummy() ?
+        new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) :
+        new ColumnVectorWithFilter(holder, rowIdMapping);
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
index f71a696..cc4858f 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
@@ -20,10 +20,18 @@
 package org.apache.iceberg.spark.data.vectorized;
 
 import java.util.List;
+import java.util.Map;
 import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
 import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.parquet.VectorizedReader;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
@@ -33,12 +41,25 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
  * {@linkplain VectorizedArrowReader VectorReader(s)}.
  */
 public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
+  private DeleteFilter<InternalRow> deletes = null;
+  private long rowStartPosInBatch = 0;
 
   public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
     super(readers);
   }
 
   @Override
+  public void setRowGroupInfo(PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData,
+                              long rowPosition) {
+    super.setRowGroupInfo(pageStore, metaData, rowPosition);
+    this.rowStartPosInBatch = rowPosition;
+  }
+
+  public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
+    this.deletes = deleteFilter;
+  }
+
+  @Override
   public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
     Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);
     ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];
@@ -47,6 +68,8 @@ public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
       closeVectors();
     }
 
+    Pair<int[], Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
     for (int i = 0; i < readers.length; i += 1) {
       vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
       int numRowsInVector = vectorHolders[i].numValues();
@@ -54,11 +77,64 @@ public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
           numRowsInVector == numRowsToRead,
           "Number of rows in the vector %s didn't match expected %s ", numRowsInVector,
           numRowsToRead);
-      arrowColumnVectors[i] =
-          IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+
+      if (rowIdMapping == null) {
+        arrowColumnVectors[i] = IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+      } else {
+        int[] rowIdMap = rowIdMapping.first();
+        Integer numRows = rowIdMapping.second();
+        arrowColumnVectors[i] = ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows);
+      }
     }
+
+    rowStartPosInBatch += numRowsToRead;
     ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
-    batch.setNumRows(numRowsToRead);
+
+    if (rowIdMapping == null) {
+      batch.setNumRows(numRowsToRead);
+    } else {
+      Integer numRows = rowIdMapping.second();
+      batch.setNumRows(numRows);
+    }
     return batch;
   }
+
+  private Pair<int[], Integer> rowIdMapping(int numRows) {
+    if (deletes != null && deletes.hasPosDeletes()) {
+      return buildRowIdMapping(deletes.deletedRowPositions(), numRows);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Build a row id mapping inside a batch, which skips delete rows. For example, if the 1st and 3rd rows are deleted in
+   * a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new num of rows is 3.
+   * @param deletedRowPositions a set of deleted row positions
+   * @param numRows the num of rows
+   * @return the mapping array and the new num of rows in a batch, null if no row is deleted
+   */
+  private Pair<int[], Integer> buildRowIdMapping(PositionDeleteIndex deletedRowPositions, int numRows) {
+    if (deletedRowPositions == null) {
+      return null;
+    }
+
+    int[] rowIdMapping = new int[numRows];
+    int originalRowId = 0;
+    int currentRowId = 0;
+    while (originalRowId < numRows) {
+      if (!deletedRowPositions.deleted(originalRowId + rowStartPosInBatch)) {
+        rowIdMapping[currentRowId] = originalRowId;
+        currentRowId++;
+      }
+      originalRowId++;
+    }
+
+    if (currentRowId == numRows) {
+      // there is no delete in this batch
+      return null;
+    } else {
+      return Pair.of(rowIdMapping, currentRowId);
+    }
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
index 514eec8..c5b7853 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
@@ -48,6 +48,14 @@ public class IcebergArrowColumnVector extends ColumnVector {
     this.accessor = ArrowVectorAccessors.getVectorAccessor(holder);
   }
 
+  protected ArrowVectorAccessor<Decimal, UTF8String, ColumnarArray, ArrowColumnVector> accessor() {
+    return accessor;
+  }
+
+  protected NullabilityHolder nullabilityHolder() {
+    return nullabilityHolder;
+  }
+
   @Override
   public void close() {
     accessor.close();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index b2d5823..020b35f 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -19,12 +19,17 @@
 
 package org.apache.iceberg.spark.data.vectorized;
 
+import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
+import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.parquet.VectorizedReader;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.catalyst.InternalRow;
 
 public class VectorizedSparkParquetReaders {
 
@@ -49,4 +54,39 @@ public class VectorizedSparkParquetReaders {
                 expectedSchema, fileSchema, setArrowValidityVector,
                 idToConstant, ColumnarBatchReader::new));
   }
+
+  public static ColumnarBatchReader buildReader(Schema expectedSchema,
+                                                MessageType fileSchema,
+                                                boolean setArrowValidityVector,
+                                                Map<Integer, ?> idToConstant,
+                                                DeleteFilter<InternalRow> deleteFilter) {
+    return (ColumnarBatchReader)
+        TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+            new ReaderBuilder(
+                expectedSchema, fileSchema, setArrowValidityVector,
+                idToConstant, ColumnarBatchReader::new, deleteFilter));
+  }
+
+  private static class ReaderBuilder extends VectorizedReaderBuilder {
+    private final DeleteFilter<InternalRow> deleteFilter;
+
+    ReaderBuilder(Schema expectedSchema,
+                  MessageType parquetSchema,
+                  boolean setArrowValidityVector,
+                  Map<Integer, ?> idToConstant,
+                  Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory,
+                  DeleteFilter<InternalRow> deleteFilter) {
+      super(expectedSchema, parquetSchema, setArrowValidityVector, idToConstant, readerFactory);
+      this.deleteFilter = deleteFilter;
+    }
+
+    @Override
+    protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>> reorderedFields) {
+      VectorizedReader<?> reader = super.vectorizedReader(reorderedFields);
+      if (deleteFilter != null) {
+        ((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter);
+      }
+      return reader;
+    }
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index b58745c..f0664c7 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -91,6 +91,10 @@ abstract class BaseDataReader<T> implements Closeable {
     this.currentIterator = CloseableIterator.empty();
   }
 
+  protected Table table() {
+    return table;
+  }
+
   public boolean next() throws IOException {
     try {
       while (true) {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index e4bd3ce..5f05c55 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -28,8 +28,10 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
@@ -38,10 +40,12 @@ import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 class BatchDataReader extends BaseDataReader<ColumnarBatch> {
@@ -71,11 +75,14 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
     InputFile location = getInputFile(task);
     Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
     if (task.file().format() == FileFormat.PARQUET) {
+      SparkDeleteFilter deleteFilter = deleteFilter(task);
+
       Parquet.ReadBuilder builder = Parquet.read(location)
           .project(expectedSchema)
           .split(task.start(), task.length())
           .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(expectedSchema,
-              fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant))
+              fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant,
+              deleteFilter))
           .recordsPerBatch(batchSize)
           .filter(task.residual())
           .caseSensitive(caseSensitive)
@@ -114,4 +121,27 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
     }
     return iter.iterator();
   }
+
+  private SparkDeleteFilter deleteFilter(FileScanTask task) {
+    return task.deletes().isEmpty() ? null : new SparkDeleteFilter(task, table().schema(), expectedSchema);
+  }
+
+  private class SparkDeleteFilter extends DeleteFilter<InternalRow> {
+    private final InternalRowWrapper asStructLike;
+
+    SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
+      super(task, tableSchema, requestedSchema);
+      this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+    }
+
+    @Override
+    protected StructLike asStructLike(InternalRow row) {
+      return asStructLike.wrap(row);
+    }
+
+    @Override
+    protected InputFile getInputFile(String location) {
+      return BatchDataReader.this.getInputFile(location);
+    }
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
index bb7ac94..e1420ed 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
@@ -175,10 +175,15 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
 
     boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);
 
+    boolean hasNoEqDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasEqDeletes);
+
     boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks);
 
-    boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
-        (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
+    boolean batchReadOrc = hasNoDeleteFiles && allOrcFileScanTasks;
+
+    boolean batchReadParquet = hasNoEqDeleteFiles && allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives;
+
+    boolean readUsingBatch = batchReadsEnabled && (batchReadOrc || batchReadParquet);
 
     int batchSize = readUsingBatch ? batchSize(allParquetFileScanTasks, allOrcFileScanTasks) : 0;
 
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
index b68db02..8e42c6e 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
@@ -24,12 +24,15 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import org.apache.arrow.vector.NullCheckingForGet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
@@ -38,6 +41,8 @@ import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.Types;
@@ -60,6 +65,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestSparkParquetReadMetadataColumns {
@@ -166,6 +173,56 @@ public class TestSparkParquetReadMetadataColumns {
   }
 
   @Test
+  public void testReadRowNumbersWithDelete() throws IOException {
+    if (vectorized) {
+      List<InternalRow> expectedRowsAfterDelete = new ArrayList<>(EXPECTED_ROWS);
+      // remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and [100, 200)
+      for (int i = 1; i <= 5; i++) {
+        expectedRowsAfterDelete.remove(98);
+      }
+
+      Parquet.ReadBuilder builder = Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA);
+
+      DeleteFilter deleteFilter = mock(DeleteFilter.class);
+      when(deleteFilter.hasPosDeletes()).thenReturn(true);
+      PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
+      deletedRowPos.delete(98, 103);
+      when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos);
+
+      builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(PROJECTION_SCHEMA,
+              fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, Maps.newHashMap(), deleteFilter));
+      builder.recordsPerBatch(RECORDS_PER_BATCH);
+
+      validate(expectedRowsAfterDelete, builder);
+    }
+  }
+
+  private class CustomizedPositionDeleteIndex implements PositionDeleteIndex {
+    private final Set<Long> deleteIndex;
+
+    private CustomizedPositionDeleteIndex() {
+      deleteIndex = Sets.newHashSet();
+    }
+
+    @Override
+    public void delete(long position) {
+      deleteIndex.add(position);
+    }
+
+    @Override
+    public void delete(long posStart, long posEnd) {
+      for (long l = posStart; l < posEnd; l++) {
+        delete(l);
+      }
+    }
+
+    @Override
+    public boolean deleted(long position) {
+      return deleteIndex.contains(position);
+    }
+  }
+
+  @Test
   public void testReadRowNumbersWithFilter() throws IOException {
     // current iceberg supports row group filter.
     for (int i = 1; i < 5; i += 1) {
@@ -212,6 +269,10 @@ public class TestSparkParquetReadMetadataColumns {
       builder = builder.split(splitStart, splitLength);
     }
 
+    validate(expected, builder);
+  }
+
+  private void validate(List<InternalRow> expected, Parquet.ReadBuilder builder) throws IOException {
     try (CloseableIterable<InternalRow> reader = vectorized ? batchesToRows(builder.build()) : builder.build()) {
       final Iterator<InternalRow> actualRows = reader.iterator();
 
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index b7398b7..e1c6355 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -49,6 +49,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkStructLike;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeSet;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.spark.sql.Dataset;
@@ -59,14 +61,30 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
 
+@RunWith(Parameterized.class)
 public class TestSparkReaderDeletes extends DeleteReadTests {
 
   private static TestHiveMetastore metastore = null;
   protected static SparkSession spark = null;
   protected static HiveCatalog catalog = null;
+  private final boolean vectorized;
+
+  public TestSparkReaderDeletes(boolean vectorized) {
+    this.vectorized = vectorized;
+  }
+
+  @Parameterized.Parameters(name = "vectorized = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {false},
+        new Object[] {true}
+    };
+  }
 
   @BeforeClass
   public static void startMetastoreAndSpark() {
@@ -106,7 +124,12 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
     TableOperations ops = ((BaseTable) table).operations();
     TableMetadata meta = ops.current();
     ops.commit(meta, meta.upgradeToFormatVersion(2));
-
+    if (vectorized) {
+      table.updateProperties()
+          .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
+          .set(TableProperties.PARQUET_BATCH_SIZE, "4") // split 7 records to two batches to cover more code paths
+          .commit();
+    }
     return table;
   }
 
@@ -215,4 +238,28 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
     Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size());
     Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet);
   }
+
+  @Test
+  public void testPosDeletesAllRowsInBatch() throws IOException {
+    // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all deleted.
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 0L), // id = 29
+        Pair.of(dataFile.path(), 1L), // id = 43
+        Pair.of(dataFile.path(), 2L), // id = 61
+        Pair.of(dataFile.path(), 3L) // id = 89
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);
+
+    table.newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    StructLikeSet expected = rowSetWithoutIds(table, records, 29, 43, 61, 89);
+    StructLikeSet actual = rowSet(tableName, table, "*");
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
 }
diff --git a/versions.props b/versions.props
index fa6505a..0becbf5 100644
--- a/versions.props
+++ b/versions.props
@@ -16,6 +16,7 @@ com.google.guava:guava = 28.0-jre
 com.github.ben-manes.caffeine:caffeine = 2.8.4
 org.apache.arrow:arrow-vector = 6.0.0
 org.apache.arrow:arrow-memory-netty = 6.0.0
+org.roaringbitmap:RoaringBitmap = 0.9.0
 io.netty:netty-buffer = 4.1.63.Final
 com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1
 com.aliyun.oss:aliyun-sdk-oss = 3.10.2