You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/03/13 02:40:42 UTC

[iceberg] branch master updated: Core: Add EqualityDeleteRowReader (#2320)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 713136d  Core: Add EqualityDeleteRowReader (#2320)
713136d is described below

commit 713136df34a65df4306a7b98c67c4b6d11cc65ad
Author: Chen, Junjie <ch...@gmail.com>
AuthorDate: Sat Mar 13 10:40:24 2021 +0800

    Core: Add EqualityDeleteRowReader (#2320)
---
 .../java/org/apache/iceberg/data/DeleteFilter.java | 46 ++++++++++++++---
 .../org/apache/iceberg/data/DeleteReadTests.java   |  9 ++++
 .../spark/source/EqualityDeleteRowReader.java      | 57 ++++++++++++++++++++++
 .../apache/iceberg/spark/source/RowDataReader.java |  8 ++-
 .../spark/source/TestSparkReaderDeletes.java       | 57 ++++++++++++++++++++++
 5 files changed, 169 insertions(+), 8 deletions(-)

diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index 1ddfa27..7a07529 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 import org.apache.iceberg.Accessor;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
@@ -48,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Filter;
 import org.apache.iceberg.util.StructLikeSet;
 import org.apache.iceberg.util.StructProjection;
 import org.apache.parquet.Preconditions;
@@ -110,9 +112,10 @@ public abstract class DeleteFilter<T> {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  private List<Predicate<T>> applyEqDeletes() {
+    List<Predicate<T>> isInDeleteSets = Lists.newArrayList();
     if (eqDeletes.isEmpty()) {
-      return records;
+      return isInDeleteSets;
     }
 
     Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
@@ -120,7 +123,6 @@ public abstract class DeleteFilter<T> {
       filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
     }
 
-    CloseableIterable<T> filteredRecords = records;
     for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
       Set<Integer> ids = entry.getKey();
       Iterable<DeleteFile> deletes = entry.getValue();
@@ -137,11 +139,43 @@ public abstract class DeleteFilter<T> {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      filteredRecords = Deletes.filter(filteredRecords,
-          record -> projectRow.wrap(asStructLike(record)), deleteSet);
+      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
+      isInDeleteSets.add(isInDeleteSet);
     }
 
-    return filteredRecords;
+    return isInDeleteSets;
+  }
+
+  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+    // Predicate to test whether a row has been deleted by equality deletions.
+    Predicate<T> deletedRows = applyEqDeletes().stream()
+        .reduce(Predicate::or)
+        .orElse(t -> false);
+
+    Filter<T> deletedRowsFilter = new Filter<T>() {
+      @Override
+      protected boolean shouldKeep(T item) {
+        return deletedRows.test(item);
+      }
+    };
+    return deletedRowsFilter.filter(records);
+  }
+
+  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+    // Predicate to test whether a row should be visible to user after applying equality deletions.
+    Predicate<T> remainingRows = applyEqDeletes().stream()
+        .map(Predicate::negate)
+        .reduce(Predicate::and)
+        .orElse(t -> true);
+
+    Filter<T> remainingRowsFilter = new Filter<T>() {
+      @Override
+      protected boolean shouldKeep(T item) {
+        return remainingRows.test(item);
+      }
+    };
+
+    return remainingRowsFilter.filter(records);
   }
 
   private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
index b373a33..1897baf 100644
--- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
+++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
@@ -328,4 +328,13 @@ public abstract class DeleteReadTests {
         .forEach(set::add);
     return set;
   }
+
+  protected StructLikeSet rowSetWitIds(int... idsToRetain) {
+    Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRetain));
+    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+    records.stream()
+        .filter(row -> deletedIds.contains(row.getField("id")))
+        .forEach(set::add);
+    return set;
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
new file mode 100644
index 0000000..a85d85d
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class EqualityDeleteRowReader extends RowDataReader {
+  private final Schema expectedSchema;
+
+  public EqualityDeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
+                                 FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
+    super(task, schema, schema, nameMapping, io, encryptionManager, caseSensitive);
+    this.expectedSchema = expectedSchema;
+  }
+
+  @Override
+  CloseableIterator<InternalRow> open(FileScanTask task) {
+    SparkDeleteFilter matches = new SparkDeleteFilter(task, tableSchema(), expectedSchema);
+
+    // schema or rows returned by readers
+    Schema requiredSchema = matches.requiredSchema();
+    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+    DataFile file = task.file();
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
+
+    return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator();
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index b021c40..7185a24 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -95,7 +95,11 @@ class RowDataReader extends BaseDataReader<InternalRow> {
     return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
   }
 
-  private CloseableIterable<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
+  protected Schema tableSchema() {
+    return tableSchema;
+  }
+
+  protected CloseableIterable<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
     CloseableIterable<InternalRow> iter;
     if (task.isDataTask()) {
       iter = newDataIterable(task.asDataTask(), readSchema);
@@ -215,7 +219,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
         JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
   }
 
-  private class SparkDeleteFilter extends DeleteFilter<InternalRow> {
+  protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
     private final InternalRowWrapper asStructLike;
 
     SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 34070e1..983941a 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.PartitionSpec;
@@ -31,6 +32,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -41,11 +43,14 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkStructLike;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.TableScanUtil;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -56,6 +61,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
 
 public abstract class TestSparkReaderDeletes extends DeleteReadTests {
 
@@ -161,4 +167,55 @@ public abstract class TestSparkReaderDeletes extends DeleteReadTests {
 
     Assert.assertEquals("Table should contain no rows", 0, actual.size());
   }
+
+  @Test
+  public void testReadEqualityDeleteRows() throws IOException {
+    Schema deleteSchema1 = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteSchema1);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d") // id = 89
+    );
+
+    Schema deleteSchema2 = table.schema().select("id");
+    Record idDelete = GenericRecord.create(deleteSchema2);
+    List<Record> idDeletes = Lists.newArrayList(
+        idDelete.copy("id", 121), // id = 121
+        idDelete.copy("id", 122) // id = 122
+    );
+
+    DeleteFile eqDelete1 = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteSchema1);
+
+    DeleteFile eqDelete2 = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), idDeletes, deleteSchema2);
+
+    table.newRowDelta()
+        .addDeletes(eqDelete1)
+        .addDeletes(eqDelete2)
+        .commit();
+
+    StructLikeSet expectedRowSet = rowSetWitIds(29, 89, 121, 122);
+
+    Types.StructType type = table.schema().asStruct();
+    StructLikeSet actualRowSet = StructLikeSet.create(type);
+
+    CloseableIterable<CombinedScanTask> tasks = TableScanUtil.planTasks(
+        table.newScan().planFiles(),
+        TableProperties.METADATA_SPLIT_SIZE_DEFAULT,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+    for (CombinedScanTask task : tasks) {
+      try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table.schema(), table.schema(),
+          table.properties().get(DEFAULT_NAME_MAPPING), table.io(), table.encryption(), false)) {
+        while (reader.next()) {
+          actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy()));
+        }
+      }
+    }
+
+    Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size());
+    Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet);
+  }
 }