You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/09/28 18:05:10 UTC

[iceberg] branch master updated: Data: Fix equality deletes with date/time types (#3135)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e52b681  Data: Fix equality deletes with date/time types (#3135)
e52b681 is described below

commit e52b6810f7983f6cd26290a5279d6898fa904ce1
Author: xloya <98...@qq.com>
AuthorDate: Wed Sep 29 02:05:01 2021 +0800

    Data: Fix equality deletes with date/time types (#3135)
    
    Co-authored-by: xiaojiebao <xi...@xiaomi.com>
---
 .../java/org/apache/iceberg/data/DeleteFilter.java |   9 +-
 .../org/apache/iceberg/data/DeleteReadTests.java   | 111 +++++++++++++++++++--
 .../iceberg/data/TestGenericReaderDeletes.java     |   4 +-
 .../iceberg/mr/TestInputFormatReaderDeletes.java   |   2 +
 4 files changed, 113 insertions(+), 13 deletions(-)

diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index a8eb13c..148f9d9 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -134,9 +134,14 @@ public abstract class DeleteFilter<T> {
 
       Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
           delete -> openDeletes(delete, deleteSchema));
+
+      // copy the delete records because they will be held in a set
+      CloseableIterable<Record> records = CloseableIterable.transform(
+          CloseableIterable.concat(deleteRecords), Record::copy);
+
       StructLikeSet deleteSet = Deletes.toEqualitySet(
-          // copy the delete records because they will be held in a set
-          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          CloseableIterable.transform(
+              records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)),
           deleteSchema.asStruct());
 
       Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
index 70ac774..69b0a57 100644
--- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
+++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.data;
 
 import java.io.IOException;
+import java.time.LocalDate;
 import java.util.List;
 import java.util.Set;
 import org.apache.iceberg.DataFile;
@@ -34,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ArrayUtil;
 import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeSet;
 import org.apache.iceberg.util.StructProjection;
@@ -51,17 +53,30 @@ public abstract class DeleteReadTests {
       Types.NestedField.required(2, "data", Types.StringType.get())
   );
 
+  public static final Schema DATE_SCHEMA = new Schema(
+      Types.NestedField.required(1, "dt", Types.DateType.get()),
+      Types.NestedField.required(2, "data", Types.StringType.get()),
+      Types.NestedField.required(3, "id", Types.IntegerType.get())
+  );
+
   // Partition spec used to create tables
   public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
       .bucket("data", 16)
       .build();
 
+  public static final PartitionSpec DATE_SPEC = PartitionSpec.builderFor(DATE_SCHEMA)
+      .day("dt")
+      .build();
+
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
-  private String tableName = null;
+  protected String tableName = null;
+  protected String dateTableName = null;
   protected Table table = null;
+  protected Table dateTable = null;
   private List<Record> records = null;
+  private List<Record> dateRecords = null;
   private DataFile dataFile = null;
 
   @Before
@@ -90,6 +105,46 @@ public abstract class DeleteReadTests {
   @After
   public void cleanup() throws IOException {
     dropTable("test");
+    dropTable("test2");
+  }
+
+  private void initDateTable() throws IOException {
+    dropTable("test2");
+    this.dateTableName = "test2";
+    this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC);
+
+    GenericRecord record = GenericRecord.create(dateTable.schema());
+
+    this.dateRecords = Lists.newArrayList(
+        record.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1),
+        record.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2),
+        record.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3),
+        record.copy("dt", LocalDate.parse("2021-09-04"), "data", "d", "id", 4),
+        record.copy("dt", LocalDate.parse("2021-09-05"), "data", "e", "id", 5));
+
+    DataFile dataFile1 = FileHelpers.writeDataFile(
+        dateTable, Files.localOutput(temp.newFile()),
+        Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dateRecords.subList(0, 1));
+    DataFile dataFile2 = FileHelpers.writeDataFile(
+        dateTable, Files.localOutput(temp.newFile()),
+        Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dateRecords.subList(1, 2));
+    DataFile dataFile3 = FileHelpers.writeDataFile(
+        dateTable, Files.localOutput(temp.newFile()),
+        Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dateRecords.subList(2, 3));
+    DataFile dataFile4 = FileHelpers.writeDataFile(
+        dateTable, Files.localOutput(temp.newFile()),
+        Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-04"))), dateRecords.subList(3, 4));
+    DataFile dataFile5 = FileHelpers.writeDataFile(
+        dateTable, Files.localOutput(temp.newFile()),
+        Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-05"))), dateRecords.subList(4, 5));
+
+    dateTable.newAppend()
+        .appendFile(dataFile1)
+        .appendFile(dataFile2)
+        .appendFile(dataFile3)
+        .appendFile(dataFile4)
+        .appendFile(dataFile5)
+        .commit();
   }
 
   protected abstract Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException;
@@ -119,13 +174,48 @@ public abstract class DeleteReadTests {
         .addDeletes(eqDeletes)
         .commit();
 
-    StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
+    StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122);
     StructLikeSet actual = rowSet(tableName, table, "*");
 
     Assert.assertEquals("Table should contain expected rows", expected, actual);
   }
 
   @Test
+  public void testEqualityDateDeletes() throws IOException {
+    initDateTable();
+
+    Schema deleteRowSchema = dateTable.schema().select("*");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1),
+        dataDelete.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2),
+        dataDelete.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3)
+    );
+
+    DeleteFile eqDeletes1 = FileHelpers.writeDeleteFile(
+        dateTable, Files.localOutput(temp.newFile()),
+        Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dataDeletes.subList(0, 1), deleteRowSchema);
+    DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile(
+        dateTable, Files.localOutput(temp.newFile()),
+        Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dataDeletes.subList(1, 2), deleteRowSchema);
+    DeleteFile eqDeletes3 = FileHelpers.writeDeleteFile(
+        dateTable, Files.localOutput(temp.newFile()),
+        Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dataDeletes.subList(2, 3), deleteRowSchema);
+
+    dateTable.newRowDelta()
+        .addDeletes(eqDeletes1)
+        .addDeletes(eqDeletes2)
+        .addDeletes(eqDeletes3)
+        .commit();
+
+    StructLikeSet expected = rowSetWithoutIds(dateTable, dateRecords, 1, 2, 3);
+
+    StructLikeSet actual = rowSet(dateTableName, dateTable, "*");
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  @Test
   public void testEqualityDeletesWithRequiredEqColumn() throws IOException {
     Schema deleteRowSchema = table.schema().select("data");
     Record dataDelete = GenericRecord.create(deleteRowSchema);
@@ -142,7 +232,7 @@ public abstract class DeleteReadTests {
         .addDeletes(eqDeletes)
         .commit();
 
-    StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id");
+    StructLikeSet expected = selectColumns(rowSetWithoutIds(table, records, 29, 89, 122), "id");
     StructLikeSet actual = rowSet(tableName, table, "id");
 
     if (expectPruned()) {
@@ -180,7 +270,7 @@ public abstract class DeleteReadTests {
         .addDeletes(eqDeletes)
         .commit();
 
-    StructLikeSet expected = rowSetWithoutIds(29, 89, 122, 144);
+    StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144);
     StructLikeSet actual = rowSet(tableName, table, "*");
 
     Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -202,7 +292,7 @@ public abstract class DeleteReadTests {
         .validateDataFilesExist(posDeletes.second())
         .commit();
 
-    StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
+    StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122);
     StructLikeSet actual = rowSet(tableName, table, "*");
 
     Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -235,7 +325,7 @@ public abstract class DeleteReadTests {
         .validateDataFilesExist(posDeletes.second())
         .commit();
 
-    StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122);
+    StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122);
     StructLikeSet actual = rowSet(tableName, table, "*");
 
     Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -269,7 +359,7 @@ public abstract class DeleteReadTests {
         .addDeletes(idEqDeletes)
         .commit();
 
-    StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122);
+    StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122);
     StructLikeSet actual = rowSet(tableName, table, "*");
 
     Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -306,7 +396,7 @@ public abstract class DeleteReadTests {
         .addDeletes(eqDeletes)
         .commit();
 
-    StructLikeSet expected = rowSetWithoutIds(131);
+    StructLikeSet expected = rowSetWithoutIds(table, records, 131);
     StructLikeSet actual = rowSet(tableName, table, "*");
 
     Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -321,11 +411,12 @@ public abstract class DeleteReadTests {
     return set;
   }
 
-  private StructLikeSet rowSetWithoutIds(int... idsToRemove) {
+  private static StructLikeSet rowSetWithoutIds(Table table, List<Record> recordList, int... idsToRemove) {
     Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
-    records.stream()
+    recordList.stream()
         .filter(row -> !deletedIds.contains(row.getField("id")))
+        .map(record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record))
         .forEach(set::add);
     return set;
   }
diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
index 72d4883..e92c0da 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestTables;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
 
@@ -48,7 +49,8 @@ public class TestGenericReaderDeletes extends DeleteReadTests {
   public StructLikeSet rowSet(String name, Table table, String... columns) throws IOException {
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
-      reader.forEach(set::add);
+      Iterables.addAll(set, CloseableIterable.transform(
+          reader, record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record)));
     }
     return set;
   }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
index ef964f8..2ba4e50 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.data.DeleteReadTests;
+import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
@@ -104,6 +105,7 @@ public class TestInputFormatReaderDeletes extends DeleteReadTests {
         .filter(recordFactory -> recordFactory.name().equals(inputFormat))
         .map(recordFactory -> recordFactory.create(builder.project(projected).conf()).getRecords())
         .flatMap(List::stream)
+        .map(record -> new InternalRecordWrapper(projected.asStruct()).wrap(record))
         .collect(Collectors.toList())
     );