You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/29 21:06:06 UTC
[iceberg] 04/09: Data: Fix equality deletes with date/time types
(#3135)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.12.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 57e3fce0080e7aec0afbed83b3f832e903f53c7b
Author: xloya <98...@qq.com>
AuthorDate: Wed Sep 29 02:05:01 2021 +0800
Data: Fix equality deletes with date/time types (#3135)
Co-authored-by: xiaojiebao <xi...@xiaomi.com>
---
.../java/org/apache/iceberg/data/DeleteFilter.java | 9 +-
.../org/apache/iceberg/data/DeleteReadTests.java | 111 +++++++++++++++++++--
.../iceberg/data/TestGenericReaderDeletes.java | 4 +-
.../iceberg/mr/TestInputFormatReaderDeletes.java | 2 +
4 files changed, 113 insertions(+), 13 deletions(-)
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index a8eb13c..148f9d9 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -134,9 +134,14 @@ public abstract class DeleteFilter<T> {
Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
delete -> openDeletes(delete, deleteSchema));
+
+ // copy the delete records because they will be held in a set
+ CloseableIterable<Record> records = CloseableIterable.transform(
+ CloseableIterable.concat(deleteRecords), Record::copy);
+
StructLikeSet deleteSet = Deletes.toEqualitySet(
- // copy the delete records because they will be held in a set
- CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+ CloseableIterable.transform(
+ records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)),
deleteSchema.asStruct());
Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
index 70ac774..69b0a57 100644
--- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
+++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.data;
import java.io.IOException;
+import java.time.LocalDate;
import java.util.List;
import java.util.Set;
import org.apache.iceberg.DataFile;
@@ -34,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;
@@ -51,17 +53,30 @@ public abstract class DeleteReadTests {
Types.NestedField.required(2, "data", Types.StringType.get())
);
+ public static final Schema DATE_SCHEMA = new Schema(
+ Types.NestedField.required(1, "dt", Types.DateType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.required(3, "id", Types.IntegerType.get())
+ );
+
// Partition spec used to create tables
public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
.bucket("data", 16)
.build();
+ public static final PartitionSpec DATE_SPEC = PartitionSpec.builderFor(DATE_SCHEMA)
+ .day("dt")
+ .build();
+
@Rule
public TemporaryFolder temp = new TemporaryFolder();
- private String tableName = null;
+ protected String tableName = null;
+ protected String dateTableName = null;
protected Table table = null;
+ protected Table dateTable = null;
private List<Record> records = null;
+ private List<Record> dateRecords = null;
private DataFile dataFile = null;
@Before
@@ -90,6 +105,46 @@ public abstract class DeleteReadTests {
@After
public void cleanup() throws IOException {
dropTable("test");
+ dropTable("test2");
+ }
+
+ private void initDateTable() throws IOException {
+ dropTable("test2");
+ this.dateTableName = "test2";
+ this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC);
+
+ GenericRecord record = GenericRecord.create(dateTable.schema());
+
+ this.dateRecords = Lists.newArrayList(
+ record.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1),
+ record.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2),
+ record.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3),
+ record.copy("dt", LocalDate.parse("2021-09-04"), "data", "d", "id", 4),
+ record.copy("dt", LocalDate.parse("2021-09-05"), "data", "e", "id", 5));
+
+ DataFile dataFile1 = FileHelpers.writeDataFile(
+ dateTable, Files.localOutput(temp.newFile()),
+ Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dateRecords.subList(0, 1));
+ DataFile dataFile2 = FileHelpers.writeDataFile(
+ dateTable, Files.localOutput(temp.newFile()),
+ Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dateRecords.subList(1, 2));
+ DataFile dataFile3 = FileHelpers.writeDataFile(
+ dateTable, Files.localOutput(temp.newFile()),
+ Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dateRecords.subList(2, 3));
+ DataFile dataFile4 = FileHelpers.writeDataFile(
+ dateTable, Files.localOutput(temp.newFile()),
+ Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-04"))), dateRecords.subList(3, 4));
+ DataFile dataFile5 = FileHelpers.writeDataFile(
+ dateTable, Files.localOutput(temp.newFile()),
+ Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-05"))), dateRecords.subList(4, 5));
+
+ dateTable.newAppend()
+ .appendFile(dataFile1)
+ .appendFile(dataFile2)
+ .appendFile(dataFile3)
+ .appendFile(dataFile4)
+ .appendFile(dataFile5)
+ .commit();
}
protected abstract Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException;
@@ -119,13 +174,48 @@ public abstract class DeleteReadTests {
.addDeletes(eqDeletes)
.commit();
- StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
+ StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122);
StructLikeSet actual = rowSet(tableName, table, "*");
Assert.assertEquals("Table should contain expected rows", expected, actual);
}
@Test
+ public void testEqualityDateDeletes() throws IOException {
+ initDateTable();
+
+ Schema deleteRowSchema = dateTable.schema().select("*");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1),
+ dataDelete.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2),
+ dataDelete.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3)
+ );
+
+ DeleteFile eqDeletes1 = FileHelpers.writeDeleteFile(
+ dateTable, Files.localOutput(temp.newFile()),
+ Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dataDeletes.subList(0, 1), deleteRowSchema);
+ DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile(
+ dateTable, Files.localOutput(temp.newFile()),
+ Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dataDeletes.subList(1, 2), deleteRowSchema);
+ DeleteFile eqDeletes3 = FileHelpers.writeDeleteFile(
+ dateTable, Files.localOutput(temp.newFile()),
+ Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dataDeletes.subList(2, 3), deleteRowSchema);
+
+ dateTable.newRowDelta()
+ .addDeletes(eqDeletes1)
+ .addDeletes(eqDeletes2)
+ .addDeletes(eqDeletes3)
+ .commit();
+
+ StructLikeSet expected = rowSetWithoutIds(dateTable, dateRecords, 1, 2, 3);
+
+ StructLikeSet actual = rowSet(dateTableName, dateTable, "*");
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ @Test
public void testEqualityDeletesWithRequiredEqColumn() throws IOException {
Schema deleteRowSchema = table.schema().select("data");
Record dataDelete = GenericRecord.create(deleteRowSchema);
@@ -142,7 +232,7 @@ public abstract class DeleteReadTests {
.addDeletes(eqDeletes)
.commit();
- StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id");
+ StructLikeSet expected = selectColumns(rowSetWithoutIds(table, records, 29, 89, 122), "id");
StructLikeSet actual = rowSet(tableName, table, "id");
if (expectPruned()) {
@@ -180,7 +270,7 @@ public abstract class DeleteReadTests {
.addDeletes(eqDeletes)
.commit();
- StructLikeSet expected = rowSetWithoutIds(29, 89, 122, 144);
+ StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144);
StructLikeSet actual = rowSet(tableName, table, "*");
Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -202,7 +292,7 @@ public abstract class DeleteReadTests {
.validateDataFilesExist(posDeletes.second())
.commit();
- StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
+ StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122);
StructLikeSet actual = rowSet(tableName, table, "*");
Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -235,7 +325,7 @@ public abstract class DeleteReadTests {
.validateDataFilesExist(posDeletes.second())
.commit();
- StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122);
+ StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122);
StructLikeSet actual = rowSet(tableName, table, "*");
Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -269,7 +359,7 @@ public abstract class DeleteReadTests {
.addDeletes(idEqDeletes)
.commit();
- StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122);
+ StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122);
StructLikeSet actual = rowSet(tableName, table, "*");
Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -306,7 +396,7 @@ public abstract class DeleteReadTests {
.addDeletes(eqDeletes)
.commit();
- StructLikeSet expected = rowSetWithoutIds(131);
+ StructLikeSet expected = rowSetWithoutIds(table, records, 131);
StructLikeSet actual = rowSet(tableName, table, "*");
Assert.assertEquals("Table should contain expected rows", expected, actual);
@@ -321,11 +411,12 @@ public abstract class DeleteReadTests {
return set;
}
- private StructLikeSet rowSetWithoutIds(int... idsToRemove) {
+ private static StructLikeSet rowSetWithoutIds(Table table, List<Record> recordList, int... idsToRemove) {
Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
- records.stream()
+ recordList.stream()
.filter(row -> !deletedIds.contains(row.getField("id")))
+ .map(record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record))
.forEach(set::add);
return set;
}
diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
index 72d4883..e92c0da 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestTables;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
@@ -48,7 +49,8 @@ public class TestGenericReaderDeletes extends DeleteReadTests {
public StructLikeSet rowSet(String name, Table table, String... columns) throws IOException {
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
- reader.forEach(set::add);
+ Iterables.addAll(set, CloseableIterable.transform(
+ reader, record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record)));
}
return set;
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
index ef964f8..2ba4e50 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.data.DeleteReadTests;
+import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
@@ -104,6 +105,7 @@ public class TestInputFormatReaderDeletes extends DeleteReadTests {
.filter(recordFactory -> recordFactory.name().equals(inputFormat))
.map(recordFactory -> recordFactory.create(builder.project(projected).conf()).getRecords())
.flatMap(List::stream)
+ .map(record -> new InternalRecordWrapper(projected.asStruct()).wrap(record))
.collect(Collectors.toList())
);