You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/11/09 05:04:11 UTC
[iceberg] branch master updated: Spark: Support vectorized reads
with position deletes (#3287)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new df4a0e5 Spark: Support vectorized reads with position deletes (#3287)
df4a0e5 is described below
commit df4a0e5c2ca78ad9d3edfbd4ad3bf751f88a6bc2
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Mon Nov 8 21:03:58 2021 -0800
Spark: Support vectorized reads with position deletes (#3287)
---
.../iceberg/arrow/vectorized/BaseBatchReader.java | 2 +-
.../arrow/vectorized/VectorizedReaderBuilder.java | 4 +
build.gradle | 1 +
.../iceberg/deletes/BitmapPositionDeleteIndex.java | 45 +++++
.../java/org/apache/iceberg/deletes/Deletes.java | 18 ++
.../iceberg/deletes/PositionDeleteIndex.java | 42 ++++
.../org/apache/iceberg/util/TableScanUtil.java | 10 +
.../java/org/apache/iceberg/data/DeleteFilter.java | 19 ++
.../org/apache/iceberg/data/DeleteReadTests.java | 39 +++-
spark/v2.4/build.gradle | 2 +
spark/v3.0/build.gradle | 3 +
spark/v3.1/build.gradle | 3 +
spark/v3.2/build.gradle | 3 +
.../spark/source/IcebergSourceBenchmark.java | 5 +
.../spark/source/IcebergSourceDeleteBenchmark.java | 211 +++++++++++++++++++++
.../IcebergSourceParquetDeleteBenchmark.java | 63 ++++++
...ebergSourceParquetMultiDeleteFileBenchmark.java | 60 ++++++
...gSourceParquetWithUnrelatedDeleteBenchmark.java | 62 ++++++
.../data/vectorized/ColumnVectorWithFilter.java | 105 ++++++++++
.../spark/data/vectorized/ColumnarBatchReader.java | 82 +++++++-
.../data/vectorized/IcebergArrowColumnVector.java | 8 +
.../vectorized/VectorizedSparkParquetReaders.java | 40 ++++
.../iceberg/spark/source/BaseDataReader.java | 4 +
.../iceberg/spark/source/BatchDataReader.java | 32 +++-
.../iceberg/spark/source/SparkBatchScan.java | 9 +-
.../data/TestSparkParquetReadMetadataColumns.java | 61 ++++++
.../spark/source/TestSparkReaderDeletes.java | 49 ++++-
versions.props | 1 +
28 files changed, 972 insertions(+), 11 deletions(-)
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
index 10f4eb0..76b5fd5 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/BaseBatchReader.java
@@ -42,7 +42,7 @@ public abstract class BaseBatchReader<T> implements VectorizedReader<T> {
}
@Override
- public final void setRowGroupInfo(
+ public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
for (VectorizedArrowReader reader : readers) {
if (reader != null) {
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
index 2bace1e..ad69d5e 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
@@ -94,6 +94,10 @@ public class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedRea
reorderedFields.add(VectorizedArrowReader.nulls());
}
}
+ return vectorizedReader(reorderedFields);
+ }
+
+ protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>> reorderedFields) {
return readerFactory.apply(reorderedFields);
}
diff --git a/build.gradle b/build.gradle
index e0097e9..3838874 100644
--- a/build.gradle
+++ b/build.gradle
@@ -215,6 +215,7 @@ project(':iceberg-core') {
implementation "com.fasterxml.jackson.core:jackson-databind"
implementation "com.fasterxml.jackson.core:jackson-core"
implementation "com.github.ben-manes.caffeine:caffeine"
+ implementation "org.roaringbitmap:RoaringBitmap"
compileOnly("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
diff --git a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
new file mode 100644
index 0000000..e7675c3
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+class BitmapPositionDeleteIndex implements PositionDeleteIndex {
+ private final Roaring64Bitmap roaring64Bitmap;
+
+ BitmapPositionDeleteIndex() {
+ roaring64Bitmap = new Roaring64Bitmap();
+ }
+
+ @Override
+ public void delete(long position) {
+ roaring64Bitmap.add(position);
+ }
+
+ @Override
+ public void delete(long posStart, long posEnd) {
+ roaring64Bitmap.add(posStart, posEnd);
+ }
+
+ @Override
+ public boolean deleted(long position) {
+ return roaring64Bitmap.contains(position);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index 62154f7..41c7793 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -107,6 +107,24 @@ public class Deletes {
}
}
+ public static <T extends StructLike> PositionDeleteIndex toPositionBitmap(CharSequence dataLocation,
+ List<CloseableIterable<T>> deleteFiles) {
+ DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocation);
+ List<CloseableIterable<Long>> positions = Lists.transform(deleteFiles, deletes ->
+ CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row)));
+ return toPositionBitmap(CloseableIterable.concat(positions));
+ }
+
+ public static PositionDeleteIndex toPositionBitmap(CloseableIterable<Long> posDeletes) {
+ try (CloseableIterable<Long> deletes = posDeletes) {
+ PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex();
+ deletes.forEach(positionDeleteIndex::delete);
+ return positionDeleteIndex;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close position delete source", e);
+ }
+ }
+
public static <T> CloseableIterable<T> streamingFilter(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes) {
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
new file mode 100644
index 0000000..73ef397
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+public interface PositionDeleteIndex {
+ /**
+ * Set a deleted row position.
+ * @param position the deleted row position
+ */
+ void delete(long position);
+
+ /**
+ * Set a range of deleted row positions.
+ * @param posStart inclusive beginning of position range
+ * @param posEnd exclusive ending of position range
+ */
+ void delete(long posStart, long posEnd);
+
+ /**
+ * Checks whether a row at the position is deleted.
+ * @param position deleted row position
+ * @return whether the position is deleted
+ */
+ boolean deleted(long position);
+}
diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
index e90cb8f..d8d2d68 100644
--- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
@@ -23,6 +23,7 @@ import java.util.function.Function;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -37,6 +38,15 @@ public class TableScanUtil {
return task.files().stream().anyMatch(TableScanUtil::hasDeletes);
}
+ /**
+ * This is temporarily introduced since we plan to support pos-delete vectorized read first, then get to the
+ * equality-delete support. We will remove this method once both are supported.
+ */
+ public static boolean hasEqDeletes(CombinedScanTask task) {
+ return task.files().stream().anyMatch(
+ t -> t.deletes().stream().anyMatch(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES)));
+ }
+
public static boolean hasDeletes(FileScanTask task) {
return !task.deletes().isEmpty();
}
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index c4123d5..cf26172 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
@@ -69,6 +70,8 @@ public abstract class DeleteFilter<T> {
private final Schema requiredSchema;
private final Accessor<StructLike> posAccessor;
+ private PositionDeleteIndex deleteRowPositions = null;
+
protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD;
this.dataFile = task.file();
@@ -98,6 +101,10 @@ public abstract class DeleteFilter<T> {
return requiredSchema;
}
+ public boolean hasPosDeletes() {
+ return !posDeletes.isEmpty();
+ }
+
Accessor<StructLike> posAccessor() {
return posAccessor;
}
@@ -185,6 +192,18 @@ public abstract class DeleteFilter<T> {
return remainingRowsFilter.filter(records);
}
+ public PositionDeleteIndex deletedRowPositions() {
+ if (posDeletes.isEmpty()) {
+ return null;
+ }
+
+ if (deleteRowPositions == null) {
+ List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+ deleteRowPositions = Deletes.toPositionBitmap(dataFile.path(), deletes);
+ }
+ return deleteRowPositions;
+ }
+
private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
if (posDeletes.isEmpty()) {
return records;
diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
index 69b0a57..75e1657 100644
--- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
+++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
@@ -75,9 +75,9 @@ public abstract class DeleteReadTests {
protected String dateTableName = null;
protected Table table = null;
protected Table dateTable = null;
- private List<Record> records = null;
+ protected List<Record> records = null;
private List<Record> dateRecords = null;
- private DataFile dataFile = null;
+ protected DataFile dataFile = null;
@Before
public void writeTestDataFile() throws IOException {
@@ -299,6 +299,39 @@ public abstract class DeleteReadTests {
}
@Test
+ public void testMultiplePosDeleteFiles() throws IOException {
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0), deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 6L) // id = 122
+ );
+
+ posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0), deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, table, "*");
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ @Test
public void testMixedPositionAndEqualityDeletes() throws IOException {
Schema dataSchema = table.schema().select("data");
Record dataDelete = GenericRecord.create(dataSchema);
@@ -411,7 +444,7 @@ public abstract class DeleteReadTests {
return set;
}
- private static StructLikeSet rowSetWithoutIds(Table table, List<Record> recordList, int... idsToRemove) {
+ protected static StructLikeSet rowSetWithoutIds(Table table, List<Record> recordList, int... idsToRemove) {
Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
recordList.stream()
diff --git a/spark/v2.4/build.gradle b/spark/v2.4/build.gradle
index 0789f12..22f4abb 100644
--- a/spark/v2.4/build.gradle
+++ b/spark/v2.4/build.gradle
@@ -65,6 +65,7 @@ project(':iceberg-spark:iceberg-spark2') {
compileOnly "org.apache.avro:avro"
compileOnly("org.apache.spark:spark-hive_2.11") {
exclude group: 'org.apache.avro', module: 'avro'
+ exclude group: 'org.roaringbitmap'
}
implementation("org.apache.orc:orc-core::nohive") {
@@ -159,6 +160,7 @@ project(':iceberg-spark:iceberg-spark-runtime') {
relocate 'org.apache.arrow', 'org.apache.iceberg.shaded.org.apache.arrow'
relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch'
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
+ relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap'
classifier null
}
diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle
index 7df4c62..2ad0fb0 100644
--- a/spark/v3.0/build.gradle
+++ b/spark/v3.0/build.gradle
@@ -64,6 +64,7 @@ project(':iceberg-spark:iceberg-spark3') {
compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
+ exclude group: 'org.roaringbitmap'
}
implementation("org.apache.orc:orc-core::nohive") {
@@ -134,6 +135,7 @@ project(":iceberg-spark:iceberg-spark3-extensions") {
compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
+ exclude group: 'org.roaringbitmap'
}
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
@@ -242,6 +244,7 @@ project(':iceberg-spark:iceberg-spark3-runtime') {
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
// relocate Antlr runtime and related deps to shade Iceberg specific version
relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4'
+ relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap'
classifier null
}
diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle
index 94befdf..510185a 100644
--- a/spark/v3.1/build.gradle
+++ b/spark/v3.1/build.gradle
@@ -64,6 +64,7 @@ project(':iceberg-spark:iceberg-spark-3.1') {
compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
+ exclude group: 'org.roaringbitmap'
}
implementation("org.apache.orc:orc-core::nohive") {
@@ -134,6 +135,7 @@ project(":iceberg-spark:iceberg-spark-3.1-extensions") {
compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
+ exclude group: 'org.roaringbitmap'
}
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
@@ -242,6 +244,7 @@ project(':iceberg-spark:iceberg-spark-3.1-runtime') {
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
// relocate Antlr runtime and related deps to shade Iceberg specific version
relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4'
+ relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap'
classifier null
}
diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle
index 2a2ff62..8b0200f 100644
--- a/spark/v3.2/build.gradle
+++ b/spark/v3.2/build.gradle
@@ -66,6 +66,7 @@ project(':iceberg-spark:iceberg-spark-3.2') {
exclude group: 'org.apache.arrow'
// to make sure io.netty.buffer only comes from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
+ exclude group: 'org.roaringbitmap'
}
implementation("org.apache.orc:orc-core::nohive") {
@@ -138,6 +139,7 @@ project(":iceberg-spark:iceberg-spark-3.2-extensions") {
exclude group: 'org.apache.arrow'
// to make sure io.netty.buffer only comes from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
+ exclude group: 'org.roaringbitmap'
}
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
@@ -244,6 +246,7 @@ project(':iceberg-spark:iceberg-spark-3.2-runtime') {
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
// relocate Antlr runtime and related deps to shade Iceberg specific version
relocate 'org.antlr.v4', 'org.apache.iceberg.shaded.org.antlr.v4'
+ relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap'
classifier null
}
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
index 2be6a8c..ca1e92f 100644
--- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdateProperties;
@@ -187,4 +188,8 @@ public abstract class IcebergSourceBenchmark {
restoreProperties.commit();
}
}
+
+ protected FileFormat fileFormat() {
+ throw new UnsupportedOperationException("Unsupported file format");
+ }
}
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java
new file mode 100644
index 0000000..d5a6db3
--- /dev/null
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+public abstract class IcebergSourceDeleteBenchmark extends IcebergSourceBenchmark {
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceDeleteBenchmark.class);
+ private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024;
+
+ protected static final int NUM_FILES = 1;
+ protected static final int NUM_ROWS = 10 * 1000 * 1000;
+
+ @Setup
+ public void setupBenchmark() throws IOException {
+ setupSpark();
+ appendData();
+ }
+
+ @TearDown
+ public void tearDownBenchmark() throws IOException {
+ tearDownSpark();
+ cleanupFiles();
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readIceberg() {
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+ withTableProperties(tableProperties, () -> {
+ String tableLocation = table().location();
+ Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+ materialize(df);
+ });
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readIcebergVectorized() {
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024));
+ tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true");
+ withTableProperties(tableProperties, () -> {
+ String tableLocation = table().location();
+ Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+ materialize(df);
+ });
+ }
+
+ protected abstract void appendData() throws IOException;
+
+ protected void writeData(int fileNum) {
+ Dataset<Row> df = spark().range(NUM_ROWS)
+ .withColumnRenamed("id", "longCol")
+ .withColumn("intCol", expr("CAST(MOD(longCol, 2147483647) AS INT)"))
+ .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+ .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+ .withColumn("dateCol", date_add(current_date(), fileNum))
+ .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+ .withColumn("stringCol", expr("CAST(dateCol AS STRING)"));
+ appendAsFile(df);
+ }
+
+ @Override
+ protected Table initTable() {
+ Schema schema = new Schema(
+ required(1, "longCol", Types.LongType.get()),
+ required(2, "intCol", Types.IntegerType.get()),
+ required(3, "floatCol", Types.FloatType.get()),
+ optional(4, "doubleCol", Types.DoubleType.get()),
+ optional(6, "dateCol", Types.DateType.get()),
+ optional(7, "timestampCol", Types.TimestampType.withZone()),
+ optional(8, "stringCol", Types.StringType.get()));
+ PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+ HadoopTables tables = new HadoopTables(hadoopConf());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+ properties.put(TableProperties.FORMAT_VERSION, "2");
+ return tables.create(schema, partitionSpec, properties, newTableLocation());
+ }
+
+ @Override
+ protected Configuration initHadoopConf() {
+ return new Configuration();
+ }
+
+ protected void writePosDeletes(CharSequence path, long numRows, double percentage) throws IOException {
+ writePosDeletes(path, numRows, percentage, 1);
+ }
+
+ protected void writePosDeletes(CharSequence path, long numRows, double percentage,
+ int numDeleteFile) throws IOException {
+ writePosDeletesWithNoise(path, numRows, percentage, 0, numDeleteFile);
+ }
+
+ protected void writePosDeletesWithNoise(CharSequence path, long numRows, double percentage, int numNoise,
+ int numDeleteFile) throws IOException {
+ Set<Long> deletedPos = Sets.newHashSet();
+ while (deletedPos.size() < numRows * percentage) {
+ deletedPos.add(ThreadLocalRandom.current().nextLong(numRows));
+ }
+ LOG.info("pos delete row count: {}, num of delete files: {}", deletedPos.size(), numDeleteFile);
+
+ int partitionSize = (int) (numRows * percentage) / numDeleteFile;
+ Iterable<List<Long>> sets = Iterables.partition(deletedPos, partitionSize);
+ for (List<Long> item : sets) {
+ writePosDeletes(path, item, numNoise);
+ }
+ }
+
+ protected void writePosDeletes(CharSequence path, List<Long> deletedPos, int numNoise) throws IOException {
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+ .dataFileFormat(fileFormat())
+ .build();
+
+ ClusteredPositionDeleteWriter<InternalRow> writer = new ClusteredPositionDeleteWriter<>(
+ writerFactory, fileFactory, table().io(),
+ fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+ PartitionSpec unpartitionedSpec = table().specs().get(0);
+
+ PositionDelete<InternalRow> positionDelete = PositionDelete.create();
+ try (ClusteredPositionDeleteWriter<InternalRow> closeableWriter = writer) {
+ for (Long pos : deletedPos) {
+ positionDelete.set(path, pos, null);
+ closeableWriter.write(positionDelete, unpartitionedSpec, null);
+ for (int i = 0; i < numNoise; i++) {
+ positionDelete.set(noisePath(path), pos, null);
+ closeableWriter.write(positionDelete, unpartitionedSpec, null);
+ }
+ }
+ }
+
+ RowDelta rowDelta = table().newRowDelta();
+ writer.result().deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.validateDeletedFiles().commit();
+ }
+
+ private OutputFileFactory newFileFactory() {
+ return OutputFileFactory.builderFor(table(), 1, 1)
+ .format(fileFormat())
+ .build();
+ }
+
+ private CharSequence noisePath(CharSequence path) {
+ // assume the data file name would be something like "00000-0-30da64e0-56b5-4743-a11b-3188a1695bf7-00001.parquet"
+ // so the dataFileSuffixLen is the UUID string length + length of "-00001.parquet", which is 36 + 14 = 60. It's OK
+ // to be not accurate here.
+ int dataFileSuffixLen = 60;
+ UUID uuid = UUID.randomUUID();
+ if (path.length() > dataFileSuffixLen) {
+ return path.subSequence(0, dataFileSuffixLen) + uuid.toString();
+ } else {
+ return uuid.toString();
+ }
+ }
+}
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java
new file mode 100644
index 0000000..234c6c5
--- /dev/null
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetDeleteBenchmark.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import java.io.IOException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark;
+import org.openjdk.jmh.annotations.Param;
+
+/**
+ * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for
+ * Iceberg.
+ * <p>
+ * This class uses a dataset with a flat schema.
+ * To run this benchmark for spark-3.2:
+ * <code>
+ * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh
+ * -PjmhIncludeRegex=IcebergSourceParquetDeleteBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-source-parquet-delete-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceParquetDeleteBenchmark extends IcebergSourceDeleteBenchmark {
+ @Param({"0", "0.000001", "0.05", "0.25", "0.5", "1"})
+ private double percentDeleteRow;
+
+ @Override
+ protected void appendData() throws IOException {
+ for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+ writeData(fileNum);
+
+ if (percentDeleteRow > 0) {
+ // add pos-deletes
+ table().refresh();
+ for (DataFile file : table().currentSnapshot().addedFiles()) {
+ writePosDeletes(file.path(), NUM_ROWS, percentDeleteRow);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected FileFormat fileFormat() {
+ return FileFormat.PARQUET;
+ }
+}
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java
new file mode 100644
index 0000000..fb39331
--- /dev/null
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import java.io.IOException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark;
+import org.openjdk.jmh.annotations.Param;
+
+/**
+ * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for
+ * Iceberg.
+ * <p>
+ * This class uses a dataset with a flat schema.
+ * To run this benchmark for spark-3.2:
+ * <code>
+ * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh
+ * -PjmhIncludeRegex=IcebergSourceParquetMultiDeleteFileBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-source-parquet-multi-delete-file-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceParquetMultiDeleteFileBenchmark extends IcebergSourceDeleteBenchmark {
+ @Param({"1", "2", "5", "10"})
+ private int numDeleteFile;
+
+ @Override
+ protected void appendData() throws IOException {
+ for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+ writeData(fileNum);
+
+ table().refresh();
+ for (DataFile file : table().currentSnapshot().addedFiles()) {
+ writePosDeletes(file.path(), NUM_ROWS, 0.25, numDeleteFile);
+ }
+ }
+ }
+
+ @Override
+ protected FileFormat fileFormat() {
+ return FileFormat.PARQUET;
+ }
+}
diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java
new file mode 100644
index 0000000..a06cadb
--- /dev/null
+++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import java.io.IOException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.spark.source.IcebergSourceDeleteBenchmark;
+import org.openjdk.jmh.annotations.Param;
+
+/**
+ * A benchmark that evaluates the non-vectorized read and vectorized read with pos-delete in the Spark data source for
+ * Iceberg.
+ * <p>
+ * This class uses a dataset with a flat schema.
+ * To run this benchmark for spark-3.2:
+ * <code>
+ * ./gradlew :iceberg-spark:iceberg-spark-3.2:jmh
+ * -PjmhIncludeRegex=IcebergSourceParquetWithUnrelatedDeleteBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-source-parquet-with-unrelated-delete-benchmark-result.txt
+ * </code>
+ */
+public class IcebergSourceParquetWithUnrelatedDeleteBenchmark extends IcebergSourceDeleteBenchmark {
+ private static final double PERCENT_DELETE_ROW = 0.05;
+ @Param({"0", "0.05", "0.25", "0.5"})
+ private double percentUnrelatedDeletes;
+
+ @Override
+ protected void appendData() throws IOException {
+ for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) {
+ writeData(fileNum);
+
+ table().refresh();
+ for (DataFile file : table().currentSnapshot().addedFiles()) {
+ writePosDeletesWithNoise(file.path(), NUM_ROWS, PERCENT_DELETE_ROW,
+ (int) (percentUnrelatedDeletes / PERCENT_DELETE_ROW), 1);
+ }
+ }
+ }
+
+ @Override
+ protected FileFormat fileFormat() {
+ return FileFormat.PARQUET;
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java
new file mode 100644
index 0000000..7804c02
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnVectorWithFilter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorHolder.ConstantVectorHolder;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class ColumnVectorWithFilter extends IcebergArrowColumnVector {
+ private final int[] rowIdMapping;
+
+ public ColumnVectorWithFilter(VectorHolder holder, int[] rowIdMapping) {
+ super(holder);
+ this.rowIdMapping = rowIdMapping;
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return nullabilityHolder().isNullAt(rowIdMapping[rowId]) == 1;
+ }
+
+ @Override
+ public boolean getBoolean(int rowId) {
+ return accessor().getBoolean(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ return accessor().getInt(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ return accessor().getLong(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ return accessor().getFloat(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return accessor().getDouble(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public ColumnarArray getArray(int rowId) {
+ if (isNullAt(rowId)) {
+ return null;
+ }
+ return accessor().getArray(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public Decimal getDecimal(int rowId, int precision, int scale) {
+ if (isNullAt(rowId)) {
+ return null;
+ }
+ return accessor().getDecimal(rowIdMapping[rowId], precision, scale);
+ }
+
+ @Override
+ public UTF8String getUTF8String(int rowId) {
+ if (isNullAt(rowId)) {
+ return null;
+ }
+ return accessor().getUTF8String(rowIdMapping[rowId]);
+ }
+
+ @Override
+ public byte[] getBinary(int rowId) {
+ if (isNullAt(rowId)) {
+ return null;
+ }
+ return accessor().getBinary(rowIdMapping[rowId]);
+ }
+
+ public static ColumnVector forHolder(VectorHolder holder, int[] rowIdMapping, int numRows) {
+ return holder.isDummy() ?
+ new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) :
+ new ColumnVectorWithFilter(holder, rowIdMapping);
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
index f71a696..cc4858f 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
@@ -20,10 +20,18 @@
package org.apache.iceberg.spark.data.vectorized;
import java.util.List;
+import java.util.Map;
import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -33,12 +41,25 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
* {@linkplain VectorizedArrowReader VectorReader(s)}.
*/
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
+ private DeleteFilter<InternalRow> deletes = null;
+ private long rowStartPosInBatch = 0;
public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
super(readers);
}
@Override
+ public void setRowGroupInfo(PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData,
+ long rowPosition) {
+ super.setRowGroupInfo(pageStore, metaData, rowPosition);
+ this.rowStartPosInBatch = rowPosition;
+ }
+
+ public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
+ this.deletes = deleteFilter;
+ }
+
+ @Override
public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);
ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];
@@ -47,6 +68,8 @@ public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
closeVectors();
}
+ Pair<int[], Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
int numRowsInVector = vectorHolders[i].numValues();
@@ -54,11 +77,64 @@ public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
numRowsInVector == numRowsToRead,
"Number of rows in the vector %s didn't match expected %s ", numRowsInVector,
numRowsToRead);
- arrowColumnVectors[i] =
- IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+
+ if (rowIdMapping == null) {
+ arrowColumnVectors[i] = IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+ } else {
+ int[] rowIdMap = rowIdMapping.first();
+ Integer numRows = rowIdMapping.second();
+ arrowColumnVectors[i] = ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows);
+ }
}
+
+ rowStartPosInBatch += numRowsToRead;
ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
- batch.setNumRows(numRowsToRead);
+
+ if (rowIdMapping == null) {
+ batch.setNumRows(numRowsToRead);
+ } else {
+ Integer numRows = rowIdMapping.second();
+ batch.setNumRows(numRows);
+ }
return batch;
}
+
+ private Pair<int[], Integer> rowIdMapping(int numRows) {
+ if (deletes != null && deletes.hasPosDeletes()) {
+ return buildRowIdMapping(deletes.deletedRowPositions(), numRows);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Build a row id mapping inside a batch, which skips delete rows. For example, if the 1st and 3rd rows are deleted in
+ * a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new num of rows is 3.
+ * @param deletedRowPositions a set of deleted row positions
+ * @param numRows the num of rows
+ * @return the mapping array and the new num of rows in a batch, null if no row is deleted
+ */
+ private Pair<int[], Integer> buildRowIdMapping(PositionDeleteIndex deletedRowPositions, int numRows) {
+ if (deletedRowPositions == null) {
+ return null;
+ }
+
+ int[] rowIdMapping = new int[numRows];
+ int originalRowId = 0;
+ int currentRowId = 0;
+ while (originalRowId < numRows) {
+ if (!deletedRowPositions.deleted(originalRowId + rowStartPosInBatch)) {
+ rowIdMapping[currentRowId] = originalRowId;
+ currentRowId++;
+ }
+ originalRowId++;
+ }
+
+ if (currentRowId == numRows) {
+ // there is no delete in this batch
+ return null;
+ } else {
+ return Pair.of(rowIdMapping, currentRowId);
+ }
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
index 514eec8..c5b7853 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
@@ -48,6 +48,14 @@ public class IcebergArrowColumnVector extends ColumnVector {
this.accessor = ArrowVectorAccessors.getVectorAccessor(holder);
}
+ protected ArrowVectorAccessor<Decimal, UTF8String, ColumnarArray, ArrowColumnVector> accessor() {
+ return accessor;
+ }
+
+ protected NullabilityHolder nullabilityHolder() {
+ return nullabilityHolder;
+ }
+
@Override
public void close() {
accessor.close();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index b2d5823..020b35f 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -19,12 +19,17 @@
package org.apache.iceberg.spark.data.vectorized;
+import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
+import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.catalyst.InternalRow;
public class VectorizedSparkParquetReaders {
@@ -49,4 +54,39 @@ public class VectorizedSparkParquetReaders {
expectedSchema, fileSchema, setArrowValidityVector,
idToConstant, ColumnarBatchReader::new));
}
+
+ public static ColumnarBatchReader buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ boolean setArrowValidityVector,
+ Map<Integer, ?> idToConstant,
+ DeleteFilter<InternalRow> deleteFilter) {
+ return (ColumnarBatchReader)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new ReaderBuilder(
+ expectedSchema, fileSchema, setArrowValidityVector,
+ idToConstant, ColumnarBatchReader::new, deleteFilter));
+ }
+
+ private static class ReaderBuilder extends VectorizedReaderBuilder {
+ private final DeleteFilter<InternalRow> deleteFilter;
+
+ ReaderBuilder(Schema expectedSchema,
+ MessageType parquetSchema,
+ boolean setArrowValidityVector,
+ Map<Integer, ?> idToConstant,
+ Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory,
+ DeleteFilter<InternalRow> deleteFilter) {
+ super(expectedSchema, parquetSchema, setArrowValidityVector, idToConstant, readerFactory);
+ this.deleteFilter = deleteFilter;
+ }
+
+ @Override
+ protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>> reorderedFields) {
+ VectorizedReader<?> reader = super.vectorizedReader(reorderedFields);
+ if (deleteFilter != null) {
+ ((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter);
+ }
+ return reader;
+ }
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index b58745c..f0664c7 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -91,6 +91,10 @@ abstract class BaseDataReader<T> implements Closeable {
this.currentIterator = CloseableIterator.empty();
}
+ protected Table table() {
+ return table;
+ }
+
public boolean next() throws IOException {
try {
while (true) {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index e4bd3ce..5f05c55 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -28,8 +28,10 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
@@ -38,10 +40,12 @@ import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnarBatch;
class BatchDataReader extends BaseDataReader<ColumnarBatch> {
@@ -71,11 +75,14 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
InputFile location = getInputFile(task);
Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
if (task.file().format() == FileFormat.PARQUET) {
+ SparkDeleteFilter deleteFilter = deleteFilter(task);
+
Parquet.ReadBuilder builder = Parquet.read(location)
.project(expectedSchema)
.split(task.start(), task.length())
.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(expectedSchema,
- fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant))
+ fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant,
+ deleteFilter))
.recordsPerBatch(batchSize)
.filter(task.residual())
.caseSensitive(caseSensitive)
@@ -114,4 +121,27 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
}
return iter.iterator();
}
+
+ private SparkDeleteFilter deleteFilter(FileScanTask task) {
+ return task.deletes().isEmpty() ? null : new SparkDeleteFilter(task, table().schema(), expectedSchema);
+ }
+
+ private class SparkDeleteFilter extends DeleteFilter<InternalRow> {
+ private final InternalRowWrapper asStructLike;
+
+ SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
+ super(task, tableSchema, requestedSchema);
+ this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+ }
+
+ @Override
+ protected StructLike asStructLike(InternalRow row) {
+ return asStructLike.wrap(row);
+ }
+
+ @Override
+ protected InputFile getInputFile(String location) {
+ return BatchDataReader.this.getInputFile(location);
+ }
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
index bb7ac94..e1420ed 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
@@ -175,10 +175,15 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);
+ boolean hasNoEqDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasEqDeletes);
+
boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks);
- boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
- (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
+ boolean batchReadOrc = hasNoDeleteFiles && allOrcFileScanTasks;
+
+ boolean batchReadParquet = hasNoEqDeleteFiles && allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives;
+
+ boolean readUsingBatch = batchReadsEnabled && (batchReadOrc || batchReadParquet);
int batchSize = readUsingBatch ? batchSize(allParquetFileScanTasks, allOrcFileScanTasks) : 0;
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
index b68db02..8e42c6e 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
@@ -24,12 +24,15 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.arrow.vector.NullCheckingForGet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
@@ -38,6 +41,8 @@ import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.Types;
@@ -60,6 +65,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestSparkParquetReadMetadataColumns {
@@ -166,6 +173,56 @@ public class TestSparkParquetReadMetadataColumns {
}
@Test
+ public void testReadRowNumbersWithDelete() throws IOException {
+ if (vectorized) {
+ List<InternalRow> expectedRowsAfterDelete = new ArrayList<>(EXPECTED_ROWS);
+ // remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and [100, 200)
+ for (int i = 1; i <= 5; i++) {
+ expectedRowsAfterDelete.remove(98);
+ }
+
+ Parquet.ReadBuilder builder = Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA);
+
+ DeleteFilter deleteFilter = mock(DeleteFilter.class);
+ when(deleteFilter.hasPosDeletes()).thenReturn(true);
+ PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
+ deletedRowPos.delete(98, 103);
+ when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos);
+
+ builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(PROJECTION_SCHEMA,
+ fileSchema, NullCheckingForGet.NULL_CHECKING_ENABLED, Maps.newHashMap(), deleteFilter));
+ builder.recordsPerBatch(RECORDS_PER_BATCH);
+
+ validate(expectedRowsAfterDelete, builder);
+ }
+ }
+
+ private class CustomizedPositionDeleteIndex implements PositionDeleteIndex {
+ private final Set<Long> deleteIndex;
+
+ private CustomizedPositionDeleteIndex() {
+ deleteIndex = Sets.newHashSet();
+ }
+
+ @Override
+ public void delete(long position) {
+ deleteIndex.add(position);
+ }
+
+ @Override
+ public void delete(long posStart, long posEnd) {
+ for (long l = posStart; l < posEnd; l++) {
+ delete(l);
+ }
+ }
+
+ @Override
+ public boolean deleted(long position) {
+ return deleteIndex.contains(position);
+ }
+ }
+
+ @Test
public void testReadRowNumbersWithFilter() throws IOException {
// current iceberg supports row group filter.
for (int i = 1; i < 5; i += 1) {
@@ -212,6 +269,10 @@ public class TestSparkParquetReadMetadataColumns {
builder = builder.split(splitStart, splitLength);
}
+ validate(expected, builder);
+ }
+
+ private void validate(List<InternalRow> expected, Parquet.ReadBuilder builder) throws IOException {
try (CloseableIterable<InternalRow> reader = vectorized ? batchesToRows(builder.build()) : builder.build()) {
final Iterator<InternalRow> actualRows = reader.iterator();
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index b7398b7..e1c6355 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -49,6 +49,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkStructLike;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.Dataset;
@@ -59,14 +61,30 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+@RunWith(Parameterized.class)
public class TestSparkReaderDeletes extends DeleteReadTests {
private static TestHiveMetastore metastore = null;
protected static SparkSession spark = null;
protected static HiveCatalog catalog = null;
+ private final boolean vectorized;
+
+ public TestSparkReaderDeletes(boolean vectorized) {
+ this.vectorized = vectorized;
+ }
+
+ @Parameterized.Parameters(name = "vectorized = {0}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ new Object[] {false},
+ new Object[] {true}
+ };
+ }
@BeforeClass
public static void startMetastoreAndSpark() {
@@ -106,7 +124,12 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
-
+ if (vectorized) {
+ table.updateProperties()
+ .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
+ .set(TableProperties.PARQUET_BATCH_SIZE, "4") // split 7 records to two batches to cover more code paths
+ .commit();
+ }
return table;
}
@@ -215,4 +238,28 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size());
Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet);
}
+
+ @Test
+ public void testPosDeletesAllRowsInBatch() throws IOException {
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = rowSetWithoutIds(table, records, 29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, table, "*");
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
}
diff --git a/versions.props b/versions.props
index fa6505a..0becbf5 100644
--- a/versions.props
+++ b/versions.props
@@ -16,6 +16,7 @@ com.google.guava:guava = 28.0-jre
com.github.ben-manes.caffeine:caffeine = 2.8.4
org.apache.arrow:arrow-vector = 6.0.0
org.apache.arrow:arrow-memory-netty = 6.0.0
+org.roaringbitmap:RoaringBitmap = 0.9.0
io.netty:netty-buffer = 4.1.63.Final
com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1
com.aliyun.oss:aliyun-sdk-oss = 3.10.2