You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/03/13 02:40:42 UTC
[iceberg] branch master updated: Core: Add EqualityDeleteRowReader
(#2320)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 713136d Core: Add EqualityDeleteRowReader (#2320)
713136d is described below
commit 713136df34a65df4306a7b98c67c4b6d11cc65ad
Author: Chen, Junjie <ch...@gmail.com>
AuthorDate: Sat Mar 13 10:40:24 2021 +0800
Core: Add EqualityDeleteRowReader (#2320)
---
.../java/org/apache/iceberg/data/DeleteFilter.java | 46 ++++++++++++++---
.../org/apache/iceberg/data/DeleteReadTests.java | 9 ++++
.../spark/source/EqualityDeleteRowReader.java | 57 ++++++++++++++++++++++
.../apache/iceberg/spark/source/RowDataReader.java | 8 ++-
.../spark/source/TestSparkReaderDeletes.java | 57 ++++++++++++++++++++++
5 files changed, 169 insertions(+), 8 deletions(-)
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index 1ddfa27..7a07529 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
@@ -48,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;
import org.apache.parquet.Preconditions;
@@ -110,9 +112,10 @@ public abstract class DeleteFilter<T> {
return applyEqDeletes(applyPosDeletes(records));
}
- private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+ private List<Predicate<T>> applyEqDeletes() {
+ List<Predicate<T>> isInDeleteSets = Lists.newArrayList();
if (eqDeletes.isEmpty()) {
- return records;
+ return isInDeleteSets;
}
Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
@@ -120,7 +123,6 @@ public abstract class DeleteFilter<T> {
filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
}
- CloseableIterable<T> filteredRecords = records;
for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
Set<Integer> ids = entry.getKey();
Iterable<DeleteFile> deletes = entry.getValue();
@@ -137,11 +139,43 @@ public abstract class DeleteFilter<T> {
CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
deleteSchema.asStruct());
- filteredRecords = Deletes.filter(filteredRecords,
- record -> projectRow.wrap(asStructLike(record)), deleteSet);
+ Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
+ isInDeleteSets.add(isInDeleteSet);
}
- return filteredRecords;
+ return isInDeleteSets;
+ }
+
+ public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+ // Predicate to test whether a row has been deleted by equality deletions.
+ Predicate<T> deletedRows = applyEqDeletes().stream()
+ .reduce(Predicate::or)
+ .orElse(t -> false);
+
+ Filter<T> deletedRowsFilter = new Filter<T>() {
+ @Override
+ protected boolean shouldKeep(T item) {
+ return deletedRows.test(item);
+ }
+ };
+ return deletedRowsFilter.filter(records);
+ }
+
+ private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+ // Predicate to test whether a row should be visible to user after applying equality deletions.
+ Predicate<T> remainingRows = applyEqDeletes().stream()
+ .map(Predicate::negate)
+ .reduce(Predicate::and)
+ .orElse(t -> true);
+
+ Filter<T> remainingRowsFilter = new Filter<T>() {
+ @Override
+ protected boolean shouldKeep(T item) {
+ return remainingRows.test(item);
+ }
+ };
+
+ return remainingRowsFilter.filter(records);
}
private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
index b373a33..1897baf 100644
--- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
+++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
@@ -328,4 +328,13 @@ public abstract class DeleteReadTests {
.forEach(set::add);
return set;
}
+
+ protected StructLikeSet rowSetWitIds(int... idsToRetain) {
+ Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRetain));
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ records.stream()
+ .filter(row -> deletedIds.contains(row.getField("id")))
+ .forEach(set::add);
+ return set;
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
new file mode 100644
index 0000000..a85d85d
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class EqualityDeleteRowReader extends RowDataReader {
+ private final Schema expectedSchema;
+
+ public EqualityDeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
+ FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
+ super(task, schema, schema, nameMapping, io, encryptionManager, caseSensitive);
+ this.expectedSchema = expectedSchema;
+ }
+
+ @Override
+ CloseableIterator<InternalRow> open(FileScanTask task) {
+ SparkDeleteFilter matches = new SparkDeleteFilter(task, tableSchema(), expectedSchema);
+
+ // schema or rows returned by readers
+ Schema requiredSchema = matches.requiredSchema();
+ Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+ DataFile file = task.file();
+
+ // update the current file for Spark's filename() function
+ InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
+
+ return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator();
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index b021c40..7185a24 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -95,7 +95,11 @@ class RowDataReader extends BaseDataReader<InternalRow> {
return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
}
- private CloseableIterable<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
+ protected Schema tableSchema() {
+ return tableSchema;
+ }
+
+ protected CloseableIterable<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
CloseableIterable<InternalRow> iter;
if (task.isDataTask()) {
iter = newDataIterable(task.asDataTask(), readSchema);
@@ -215,7 +219,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
}
- private class SparkDeleteFilter extends DeleteFilter<InternalRow> {
+ protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;
SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 34070e1..983941a 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
@@ -31,6 +32,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -41,11 +43,14 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkStructLike;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -56,6 +61,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
public abstract class TestSparkReaderDeletes extends DeleteReadTests {
@@ -161,4 +167,55 @@ public abstract class TestSparkReaderDeletes extends DeleteReadTests {
Assert.assertEquals("Table should contain no rows", 0, actual.size());
}
+
+ @Test
+ public void testReadEqualityDeleteRows() throws IOException {
+ Schema deleteSchema1 = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteSchema1);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d") // id = 89
+ );
+
+ Schema deleteSchema2 = table.schema().select("id");
+ Record idDelete = GenericRecord.create(deleteSchema2);
+ List<Record> idDeletes = Lists.newArrayList(
+ idDelete.copy("id", 121), // id = 121
+ idDelete.copy("id", 122) // id = 122
+ );
+
+ DeleteFile eqDelete1 = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteSchema1);
+
+ DeleteFile eqDelete2 = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), idDeletes, deleteSchema2);
+
+ table.newRowDelta()
+ .addDeletes(eqDelete1)
+ .addDeletes(eqDelete2)
+ .commit();
+
+ StructLikeSet expectedRowSet = rowSetWitIds(29, 89, 121, 122);
+
+ Types.StructType type = table.schema().asStruct();
+ StructLikeSet actualRowSet = StructLikeSet.create(type);
+
+ CloseableIterable<CombinedScanTask> tasks = TableScanUtil.planTasks(
+ table.newScan().planFiles(),
+ TableProperties.METADATA_SPLIT_SIZE_DEFAULT,
+ TableProperties.SPLIT_LOOKBACK_DEFAULT,
+ TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+ for (CombinedScanTask task : tasks) {
+ try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table.schema(), table.schema(),
+ table.properties().get(DEFAULT_NAME_MAPPING), table.io(), table.encryption(), false)) {
+ while (reader.next()) {
+ actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy()));
+ }
+ }
+ }
+
+ Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size());
+ Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet);
+ }
}