You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/02/17 21:03:39 UTC
[iceberg] branch master updated: Spark 3.3: Support reading position deletes table (#6716)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bd58ca5dd7 Spark 3.3: Support reading position deletes table (#6716)
bd58ca5dd7 is described below
commit bd58ca5dd7759c990993c3d85a824bec595078e2
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Fri Feb 17 13:03:33 2023 -0800
Spark 3.3: Support reading position deletes table (#6716)
Co-authored-by: Ryan Blue <bl...@apache.org>
---
.../apache/iceberg/expressions/ExpressionUtil.java | 29 +
.../java/org/apache/iceberg/BaseMetadataTable.java | 19 +
.../org/apache/iceberg/PositionDeletesTable.java | 23 +-
.../java/org/apache/iceberg/data/FileHelpers.java | 22 +
.../spark/source/PositionDeletesRowReader.java | 108 +++
.../spark/source/SparkRowReaderFactory.java | 4 +
.../spark/source/TestPositionDeletesTable.java | 861 +++++++++++++++++++++
7 files changed, 1059 insertions(+), 7 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
index 9d94be1635..3512f274e2 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.transforms.Transforms;
@@ -82,6 +83,24 @@ public class ExpressionUtil {
return ExpressionVisitors.visit(expr, new StringSanitizer());
}
+ /**
+ * Extracts an expression that references only the given column IDs from the given expression.
+ *
+ * <p>The result is inclusive. If a row would match the original filter, it must match the result
+ * filter.
+ *
+ * @param expression a filter Expression
+ * @param schema a Schema
+ * @param caseSensitive whether binding is case sensitive
+ * @param ids field IDs used to match predicates to extract from the expression
+ * @return an Expression that selects at least the same rows as the original using only the IDs
+ */
+ public static Expression extractByIdInclusive(
+ Expression expression, Schema schema, boolean caseSensitive, int... ids) {
+ PartitionSpec spec = identitySpec(schema, ids);
+ return Projections.inclusive(spec, caseSensitive).project(Expressions.rewriteNot(expression));
+ }
+
/**
* Returns whether two unbound expressions will accept the same inputs.
*
@@ -396,4 +415,14 @@ public class ExpressionUtil {
// hash the value and return the hash as hex
return String.format("(hash-%08x)", HASH_FUNC.apply(value));
}
+
+ private static PartitionSpec identitySpec(Schema schema, int... ids) {
+ PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema);
+
+ for (int id : ids) {
+ specBuilder.identity(schema.findColumnName(id));
+ }
+
+ return specBuilder.build();
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 251ed008a5..d5b89865b5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
@@ -83,6 +84,24 @@ public abstract class BaseMetadataTable extends BaseReadOnlyTable
return builder.build();
}
+ /**
+ * This method transforms the given partition specs to specs that are used to rewrite the
+ * user-provided filter expression against the given metadata table.
+ *
+ * <p>See: {@link #transformSpec(Schema, PartitionSpec)}
+ *
+ * @param metadataTableSchema schema of the metadata table
+ * @param specs specs on which the metadata table schema is based
+ * @return specs used to rewrite the metadata table filters to partition filters using an
+ * inclusive projection
+ */
+ static Map<Integer, PartitionSpec> transformSpecs(
+ Schema metadataTableSchema, Map<Integer, PartitionSpec> specs) {
+ return specs.values().stream()
+ .map(spec -> transformSpec(metadataTableSchema, spec))
+ .collect(Collectors.toMap(PartitionSpec::specId, spec -> spec));
+ }
+
abstract MetadataTableType metadataTableType();
protected BaseTable table() {
diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index 47d0e5100e..29228d136d 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
-import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
@@ -43,15 +42,18 @@ import org.apache.iceberg.util.TableScanUtil;
public class PositionDeletesTable extends BaseMetadataTable {
private final Schema schema;
+ private final int defaultSpecId;
+ private final Map<Integer, PartitionSpec> specs;
PositionDeletesTable(Table table) {
- super(table, table.name() + ".position_deletes");
- this.schema = calculateSchema();
+ this(table, table.name() + ".position_deletes");
}
PositionDeletesTable(Table table, String name) {
super(table, name);
this.schema = calculateSchema();
+ this.defaultSpecId = table.spec().specId();
+ this.specs = transformSpecs(schema(), table.specs());
}
@Override
@@ -75,6 +77,16 @@ public class PositionDeletesTable extends BaseMetadataTable {
return schema;
}
+ @Override
+ public PartitionSpec spec() {
+ return specs.get(defaultSpecId);
+ }
+
+ @Override
+ public Map<Integer, PartitionSpec> specs() {
+ return specs;
+ }
+
private Schema calculateSchema() {
Types.StructType partitionType = Partitioning.partitionType(table());
Schema result =
@@ -144,10 +156,7 @@ public class PositionDeletesTable extends BaseMetadataTable {
String schemaString = SchemaParser.toJson(tableSchema());
// prepare transformed partition specs and caches
- Map<Integer, PartitionSpec> transformedSpecs =
- table().specs().values().stream()
- .map(spec -> transformSpec(tableSchema(), spec))
- .collect(Collectors.toMap(PartitionSpec::specId, spec -> spec));
+ Map<Integer, PartitionSpec> transformedSpecs = transformSpecs(tableSchema(), table().specs());
LoadingCache<Integer, ResidualEvaluator> residualCache =
partitionCacheOf(
diff --git a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
index 6531441fa5..557e775057 100644
--- a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
+++ b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
@@ -139,6 +139,28 @@ public class FileHelpers {
.build();
}
+ public static DeleteFile writePosDeleteFile(
+ Table table, OutputFile out, StructLike partition, List<PositionDelete<?>> deletes)
+ throws IOException {
+ FileFormat format = defaultFormat(table.properties());
+ FileAppenderFactory<Record> factory =
+ new GenericAppenderFactory(
+ table.schema(),
+ table.spec(),
+ null, // Equality Fields
+ null, // Equality Delete row schema
+ table.schema()); // Position Delete row schema (will be wrapped)
+
+ PositionDeleteWriter<?> writer = factory.newPosDeleteWriter(encrypt(out), format, partition);
+ try (Closeable toClose = writer) {
+ for (PositionDelete delete : deletes) {
+ writer.write(delete);
+ }
+ }
+
+ return writer.toDeleteFile();
+ }
+
private static EncryptedOutputFile encrypt(OutputFile out) {
return EncryptedFiles.encryptedOutput(out, EncryptionKeyMetadata.EMPTY);
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
new file mode 100644
index 0000000000..04eecc80bb
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+ implements PartitionReader<InternalRow> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+ PositionDeletesRowReader(SparkInputPartition partition) {
+ this(
+ partition.table(),
+ partition.taskGroup(),
+ partition.expectedSchema(),
+ partition.isCaseSensitive());
+ }
+
+ PositionDeletesRowReader(
+ Table table,
+ ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+ Schema expectedSchema,
+ boolean caseSensitive) {
+
+ super(table, taskGroup, expectedSchema, caseSensitive);
+
+ int numSplits = taskGroup.tasks().size();
+ LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+ }
+
+ @Override
+ protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+ return Stream.of(task.file());
+ }
+
+ @Override
+ protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+ String filePath = task.file().path().toString();
+ LOG.debug("Opening position delete file {}", filePath);
+
+ // update the current file for Spark's filename() function
+ InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+ InputFile inputFile = getInputFile(task.file().path().toString());
+ Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with %s", task);
+
+ // select out constant fields when pushing down filter to row reader
+ Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
+ Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
+ Expression residualWithoutConstants =
+ ExpressionUtil.extractByIdInclusive(
+ task.residual(), expectedSchema(), caseSensitive(), Ints.toArray(nonConstantFieldIds));
+
+ return newIterable(
+ inputFile,
+ task.file().format(),
+ task.start(),
+ task.length(),
+ residualWithoutConstants,
+ expectedSchema(),
+ idToConstant)
+ .iterator();
+ }
+
+ private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
+ Set<Integer> fields = expectedSchema().idToName().keySet();
+ return fields.stream()
+ .filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
+ .filter(id -> !idToConstant.containsKey(id))
+ .collect(Collectors.toSet());
+ }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java
index 8755cb84a2..23699aeb16 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
import org.apache.iceberg.ChangelogScanTask;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.InputPartition;
@@ -46,6 +47,9 @@ class SparkRowReaderFactory implements PartitionReaderFactory {
} else if (partition.allTasksOfType(ChangelogScanTask.class)) {
return new ChangelogRowReader(partition);
+ } else if (partition.allTasksOfType(PositionDeletesScanTask.class)) {
+ return new PositionDeletesRowReader(partition);
+
} else {
throw new UnsupportedOperationException(
"Unsupported task group for row-based reads: " + partition.taskGroup());
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
new file mode 100644
index 0000000000..353bbf07be
--- /dev/null
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkStructLike;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.functions;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestPositionDeletesTable extends SparkTestBase {
+
+ public static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()));
+ private final FileFormat format;
+
+ @Parameterized.Parameters(name = "fileFormat = {0}")
+ public static Object[][] parameters() {
+ return new Object[][] {{FileFormat.PARQUET}, {FileFormat.AVRO}, {FileFormat.ORC}};
+ }
+
+ public TestPositionDeletesTable(FileFormat format) {
+ this.format = format;
+ }
+
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ @Test
+ public void testNullRows() throws IOException {
+ String tableName = "null_rows";
+ Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+
+ DataFile dFile = dataFile(tab);
+ tab.newAppend().appendFile(dFile).commit();
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
+ deletes.add(Pair.of(dFile.path(), 0L));
+ deletes.add(Pair.of(dFile.path(), 1L));
+ Pair<DeleteFile, CharSequenceSet> posDeletes =
+ FileHelpers.writeDeleteFile(
+ tab, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);
+ tab.newRowDelta().addDeletes(posDeletes.first()).commit();
+
+ StructLikeSet actual = actual(tableName, tab);
+
+ List<PositionDelete<?>> expectedDeletes =
+ Lists.newArrayList(positionDelete(dFile.path(), 0L), positionDelete(dFile.path(), 1L));
+ StructLikeSet expected =
+ expected(tab, expectedDeletes, null, posDeletes.first().path().toString());
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expected, actual);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testPartitionedTable() throws IOException {
+ // Create table with two partitions
+ String tableName = "partitioned_table";
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table tab = createTable(tableName, SCHEMA, spec);
+
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ // Add position deletes for both partitions
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Select deletes from one partition
+ StructLikeSet actual = actual(tableName, tab, "row.data='b'");
+ GenericRecord partitionB = GenericRecord.create(tab.spec().partitionType());
+ partitionB.setField("data", "b");
+ StructLikeSet expected =
+ expected(tab, deletesB.first(), partitionB, deletesB.second().path().toString());
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expected, actual);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testSelect() throws IOException {
+ // Create table with two partitions
+ String tableName = "select";
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table tab = createTable(tableName, SCHEMA, spec);
+
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ // Add position deletes for both partitions
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Select certain columns
+ Dataset<Row> df =
+ spark
+ .read()
+ .format("iceberg")
+ .load("default." + tableName + ".position_deletes")
+ .withColumn("input_file", functions.input_file_name())
+ .select("row.id", "pos", "delete_file_path", "input_file");
+ List<Object[]> actual = rowsToJava(df.collectAsList());
+
+ // Select cols from expected delete values
+ List<Object[]> expected = Lists.newArrayList();
+ BiFunction<PositionDelete<?>, DeleteFile, Object[]> toRow =
+ (delete, file) -> {
+ int rowData = delete.get(2, GenericRecord.class).get(0, Integer.class);
+ long pos = delete.get(1, Long.class);
+ return row(rowData, pos, file.path().toString(), file.path().toString());
+ };
+ expected.addAll(
+ deletesA.first().stream()
+ .map(d -> toRow.apply(d, deletesA.second()))
+ .collect(Collectors.toList()));
+ expected.addAll(
+ deletesB.first().stream()
+ .map(d -> toRow.apply(d, deletesB.second()))
+ .collect(Collectors.toList()));
+
+ // Sort and compare
+ Comparator<Object[]> comp =
+ (o1, o2) -> {
+ int result = Integer.compare((int) o1[0], (int) o2[0]);
+ if (result != 0) {
+ return result;
+ } else {
+ return ((String) o1[2]).compareTo((String) o2[2]);
+ }
+ };
+ actual.sort(comp);
+ expected.sort(comp);
+ assertEquals("Position Delete table should contain expected rows", expected, actual);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testSplitTasks() throws IOException {
+ String tableName = "big_table";
+ Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+ tab.updateProperties().set("read.split.target-size", "100").commit();
+ int records = 500;
+
+ GenericRecord record = GenericRecord.create(tab.schema());
+ List<org.apache.iceberg.data.Record> dataRecords = Lists.newArrayList();
+ for (int i = 0; i < records; i++) {
+ dataRecords.add(record.copy("id", i, "data", String.valueOf(i)));
+ }
+ DataFile dFile =
+ FileHelpers.writeDataFile(
+ tab,
+ Files.localOutput(temp.newFile()),
+ org.apache.iceberg.TestHelpers.Row.of(),
+ dataRecords);
+ tab.newAppend().appendFile(dFile).commit();
+
+ List<PositionDelete<?>> deletes = Lists.newArrayList();
+ for (long i = 0; i < records; i++) {
+ deletes.add(positionDelete(tab.schema(), dFile.path(), i, (int) i, String.valueOf(i)));
+ }
+ DeleteFile posDeletes =
+ FileHelpers.writePosDeleteFile(
+ tab, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);
+ tab.newRowDelta().addDeletes(posDeletes).commit();
+
+ Table deleteTable =
+ MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+ Assert.assertTrue(
+ "Position delete scan should produce more than one split",
+ Iterables.size(deleteTable.newBatchScan().planTasks()) > 1);
+
+ StructLikeSet actual = actual(tableName, tab);
+ StructLikeSet expected = expected(tab, deletes, null, posDeletes.path().toString());
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expected, actual);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testPartitionFilter() throws IOException {
+ // Create table with two partitions
+ String tableName = "partition_filter";
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table tab = createTable(tableName, SCHEMA, spec);
+ Table deletesTab =
+ MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ // Add position deletes for both partitions
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b");
+
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Prepare expected values
+ GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType());
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ Record partitionB = partitionRecordTemplate.copy("data", "b");
+ StructLikeSet expectedA =
+ expected(tab, deletesA.first(), partitionA, deletesA.second().path().toString());
+ StructLikeSet expectedB =
+ expected(tab, deletesB.first(), partitionB, deletesB.second().path().toString());
+ StructLikeSet allExpected = StructLikeSet.create(deletesTab.schema().asStruct());
+ allExpected.addAll(expectedA);
+ allExpected.addAll(expectedB);
+
+ // Select deletes from all partitions
+ StructLikeSet actual = actual(tableName, tab);
+ Assert.assertEquals("Position Delete table should contain expected rows", allExpected, actual);
+
+ // Select deletes from one partition
+ StructLikeSet actual2 = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actual2);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testPartitionTransformFilter() throws IOException {
+ // Create table with two partitions
+ String tableName = "partition_filter";
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).truncate("data", 1).build();
+ Table tab = createTable(tableName, SCHEMA, spec);
+ Table deletesTable =
+ MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+
+ DataFile dataFileA = dataFile(tab, new Object[] {"aa"}, new Object[] {"a"});
+ DataFile dataFileB = dataFile(tab, new Object[] {"bb"}, new Object[] {"b"});
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ // Add position deletes for both partitions
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
+ deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
+ deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Prepare expected values
+ GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType());
+ Record partitionA = partitionRecordTemplate.copy("data_trunc", "a");
+ Record partitionB = partitionRecordTemplate.copy("data_trunc", "b");
+ StructLikeSet expectedA =
+ expected(tab, deletesA.first(), partitionA, deletesA.second().path().toString());
+ StructLikeSet expectedB =
+ expected(tab, deletesB.first(), partitionB, deletesB.second().path().toString());
+ StructLikeSet allExpected = StructLikeSet.create(deletesTable.schema().asStruct());
+ allExpected.addAll(expectedA);
+ allExpected.addAll(expectedB);
+
+ // Select deletes from all partitions
+ StructLikeSet actual = actual(tableName, tab);
+ Assert.assertEquals("Position Delete table should contain expected rows", allExpected, actual);
+
+ // Select deletes from one partition
+ StructLikeSet actual2 = actual(tableName, tab, "partition.data_trunc = 'a' AND pos >= 0");
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actual2);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testPartitionEvolutionReplace() throws Exception {
+ // Create table with spec (data)
+ String tableName = "partition_evolution";
+ PartitionSpec originalSpec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table tab = createTable(tableName, SCHEMA, originalSpec);
+ int dataSpec = tab.spec().specId();
+
+ // Add files with old spec (data)
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b");
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Switch partition spec from (data) to (id)
+ tab.updateSpec().removeField("data").addField("id").commit();
+
+ // Add data and delete files with new spec (id)
+ DataFile dataFile10 = dataFile(tab, 10);
+ DataFile dataFile99 = dataFile(tab, 99);
+ tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = deleteFile(tab, dataFile10, 10);
+ Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab, dataFile10, 99);
+ tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit();
+
+ // Query partition of old spec
+ GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ StructLikeSet expectedA =
+ expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second().path().toString());
+ StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+ // Query partition of new spec
+ Record partition10 = partitionRecordTemplate.copy("id", 10);
+ StructLikeSet expected10 =
+ expected(
+ tab,
+ deletes10.first(),
+ partition10,
+ tab.spec().specId(),
+ deletes10.second().path().toString());
+ StructLikeSet actual10 = actual(tableName, tab, "partition.id = 10 AND pos >= 0");
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expected10, actual10);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testPartitionEvolutionAdd() throws Exception {
+ // Create unpartitioned table
+ String tableName = "partition_evolution_add";
+ Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+ int specId0 = tab.spec().specId();
+
+ // Add files with unpartitioned spec
+ DataFile dataFileUnpartitioned = dataFile(tab);
+ tab.newAppend().appendFile(dataFileUnpartitioned).commit();
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned =
+ deleteFile(tab, dataFileUnpartitioned);
+ tab.newRowDelta().addDeletes(deletesUnpartitioned.second()).commit();
+
+ // Switch partition spec to (data)
+ tab.updateSpec().addField("data").commit();
+ int specId1 = tab.spec().specId();
+
+ // Add files with new spec (data)
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Select deletes from new spec (data)
+ GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ StructLikeSet expectedA =
+ expected(tab, deletesA.first(), partitionA, specId1, deletesA.second().path().toString());
+ StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+ // Select deletes from 'unpartitioned' data
+ Record unpartitionedRecord = partitionRecordTemplate.copy("data", null);
+ StructLikeSet expectedUnpartitioned =
+ expected(
+ tab,
+ deletesUnpartitioned.first(),
+ unpartitionedRecord,
+ specId0,
+ deletesUnpartitioned.second().path().toString());
+ StructLikeSet actualUnpartitioned =
+ actual(tableName, tab, "partition.data IS NULL and pos >= 0");
+
+ Assert.assertEquals(
+ "Position Delete table should contain expected rows",
+ expectedUnpartitioned,
+ actualUnpartitioned);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testPartitionEvolutionRemove() throws Exception {
+ // Create table with spec (data)
+ String tableName = "partition_evolution_remove";
+ PartitionSpec originalSpec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table tab = createTable(tableName, SCHEMA, originalSpec);
+ int specId0 = tab.spec().specId();
+
+ // Add files with spec (data)
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Remove partition field
+ tab.updateSpec().removeField("data").commit();
+ int specId1 = tab.spec().specId();
+
+ // Add unpartitioned files
+ DataFile dataFileUnpartitioned = dataFile(tab);
+ tab.newAppend().appendFile(dataFileUnpartitioned).commit();
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned =
+ deleteFile(tab, dataFileUnpartitioned);
+ tab.newRowDelta().addDeletes(deletesUnpartitioned.second()).commit();
+
+ // Select deletes from (data) spec
+ GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ StructLikeSet expectedA =
+ expected(tab, deletesA.first(), partitionA, specId0, deletesA.second().path().toString());
+ StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+ // Select deletes from 'unpartitioned' spec
+ Record unpartitionedRecord = partitionRecordTemplate.copy("data", null);
+ StructLikeSet expectedUnpartitioned =
+ expected(
+ tab,
+ deletesUnpartitioned.first(),
+ unpartitionedRecord,
+ specId1,
+ deletesUnpartitioned.second().path().toString());
+ StructLikeSet actualUnpartitioned =
+ actual(tableName, tab, "partition.data IS NULL and pos >= 0");
+
+ Assert.assertEquals(
+ "Position Delete table should contain expected rows",
+ expectedUnpartitioned,
+ actualUnpartitioned);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testSpecIdFilter() throws Exception {
+ // Create table with spec (data)
+ String tableName = "spec_id_filter";
+ Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+ int unpartitionedSpec = tab.spec().specId();
+
+ // Add data file and delete
+ DataFile dataFileUnpartitioned = dataFile(tab);
+ tab.newAppend().appendFile(dataFileUnpartitioned).commit();
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned =
+ deleteFile(tab, dataFileUnpartitioned);
+ tab.newRowDelta().addDeletes(deletesUnpartitioned.second()).commit();
+
+ // Switch partition spec to (data) and add files
+ tab.updateSpec().addField("data").commit();
+ int dataSpec = tab.spec().specId();
+
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Select deletes from 'unpartitioned'
+ GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+ StructLikeSet expectedUnpartitioned =
+ expected(
+ tab,
+ deletesUnpartitioned.first(),
+ partitionRecordTemplate,
+ unpartitionedSpec,
+ deletesUnpartitioned.second().path().toString());
+ StructLikeSet actualUnpartitioned =
+ actual(tableName, tab, String.format("spec_id = %d", unpartitionedSpec));
+ Assert.assertEquals(
+ "Position Delete table should contain expected rows",
+ expectedUnpartitioned,
+ actualUnpartitioned);
+
+ // Select deletes from 'data' partition spec
+ StructLike partitionA = partitionRecordTemplate.copy("data", "a");
+ StructLike partitionB = partitionRecordTemplate.copy("data", "b");
+ StructLikeSet expected =
+ expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second().path().toString());
+ expected.addAll(
+ expected(tab, deletesB.first(), partitionB, dataSpec, deletesB.second().path().toString()));
+
+ StructLikeSet actual = actual(tableName, tab, String.format("spec_id = %d", dataSpec));
+ Assert.assertEquals("Position Delete table should contain expected rows", expected, actual);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testSchemaEvolutionAdd() throws Exception {
+ // Create table with original schema
+ String tableName = "schema_evolution_add";
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table tab = createTable(tableName, SCHEMA, spec);
+
+ // Add files with original schema
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Add files with new schema
+ tab.updateSchema()
+ .addColumn("new_col_1", Types.IntegerType.get())
+ .addColumn("new_col_2", Types.IntegerType.get())
+ .commit();
+
+ // Add files with new schema
+ DataFile dataFileC = dataFile(tab, "c");
+ DataFile dataFileD = dataFile(tab, "d");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesC = deleteFile(tab, dataFileC, "c");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesD = deleteFile(tab, dataFileD, "d");
+ tab.newRowDelta().addDeletes(deletesC.second()).addDeletes(deletesD.second()).commit();
+
+ // Select deletes from old schema
+ GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ // pad expected delete rows with null values for new columns
+ List<PositionDelete<?>> expectedDeletesA = deletesA.first();
+ expectedDeletesA.forEach(
+ d -> {
+ GenericRecord nested = d.get(2, GenericRecord.class);
+ GenericRecord padded = GenericRecord.create(tab.schema().asStruct());
+ padded.set(0, nested.get(0));
+ padded.set(1, nested.get(1));
+ padded.set(2, null);
+ padded.set(3, null);
+ d.set(2, padded);
+ });
+ StructLikeSet expectedA =
+ expected(tab, expectedDeletesA, partitionA, deletesA.second().path().toString());
+ StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+ // Select deletes from new schema
+ Record partitionC = partitionRecordTemplate.copy("data", "c");
+ StructLikeSet expectedC =
+ expected(tab, deletesC.first(), partitionC, deletesC.second().path().toString());
+ StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c' and pos >= 0");
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedC, actualC);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testSchemaEvolutionRemove() throws Exception {
+ // Create table with original schema
+ String tableName = "schema_evolution_remove";
+ Schema oldSchema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "new_col_1", Types.IntegerType.get()),
+ Types.NestedField.optional(4, "new_col_2", Types.IntegerType.get()));
+ PartitionSpec spec = PartitionSpec.builderFor(oldSchema).identity("data").build();
+ Table tab = createTable(tableName, oldSchema, spec);
+
+ // Add files with original schema
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Add files with new schema
+ tab.updateSchema().deleteColumn("new_col_1").deleteColumn("new_col_2").commit();
+
+ // Add files with new schema
+ DataFile dataFileC = dataFile(tab, "c");
+ DataFile dataFileD = dataFile(tab, "d");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesC = deleteFile(tab, dataFileC, "c");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesD = deleteFile(tab, dataFileD, "d");
+ tab.newRowDelta().addDeletes(deletesC.second()).addDeletes(deletesD.second()).commit();
+
+ // Select deletes from old schema
+ GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ // remove deleted columns from expected result
+ List<PositionDelete<?>> expectedDeletesA = deletesA.first();
+ expectedDeletesA.forEach(
+ d -> {
+ GenericRecord nested = d.get(2, GenericRecord.class);
+ GenericRecord padded = GenericRecord.create(tab.schema().asStruct());
+ padded.set(0, nested.get(0));
+ padded.set(1, nested.get(1));
+ d.set(2, padded);
+ });
+ StructLikeSet expectedA =
+ expected(tab, expectedDeletesA, partitionA, deletesA.second().path().toString());
+ StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+ // Select deletes from new schema
+ Record partitionC = partitionRecordTemplate.copy("data", "c");
+ StructLikeSet expectedC =
+ expected(tab, deletesC.first(), partitionC, deletesC.second().path().toString());
+ StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c' and pos >= 0");
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedC, actualC);
+ dropTable(tableName);
+ }
+
+ private StructLikeSet actual(String tableName, Table table) {
+ return actual(tableName, table, null);
+ }
+
+ private StructLikeSet actual(String tableName, Table table, String filter) {
+ Dataset<Row> df =
+ spark.read().format("iceberg").load("default." + tableName + ".position_deletes");
+ if (filter != null) {
+ df = df.filter(filter);
+ }
+ Table deletesTable =
+ MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
+ Types.StructType projection = deletesTable.schema().asStruct();
+ StructLikeSet set = StructLikeSet.create(projection);
+ df.collectAsList()
+ .forEach(
+ row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ set.add(rowWrapper.wrap(row));
+ });
+
+ return set;
+ }
+
+ protected Table createTable(String name, Schema schema, PartitionSpec spec) {
+ Map<String, String> properties =
+ ImmutableMap.of(
+ TableProperties.FORMAT_VERSION,
+ "2",
+ TableProperties.DEFAULT_FILE_FORMAT,
+ format.toString());
+ return catalog.createTable(TableIdentifier.of("default", name), schema, spec, properties);
+ }
+
+ protected void dropTable(String name) {
+ catalog.dropTable(TableIdentifier.of("default", name));
+ }
+
+ private PositionDelete<GenericRecord> positionDelete(CharSequence path, Long position) {
+ PositionDelete<GenericRecord> posDelete = PositionDelete.create();
+ posDelete.set(path, position, null);
+ return posDelete;
+ }
+
+ private PositionDelete<GenericRecord> positionDelete(
+ Schema tableSchema, CharSequence path, Long position, Object... values) {
+ PositionDelete<GenericRecord> posDelete = PositionDelete.create();
+ GenericRecord nested = GenericRecord.create(tableSchema);
+ for (int i = 0; i < values.length; i++) {
+ nested.set(i, values[i]);
+ }
+ posDelete.set(path, position, nested);
+ return posDelete;
+ }
+
+ private StructLikeSet expected(
+ Table testTable,
+ List<PositionDelete<?>> deletes,
+ StructLike partitionStruct,
+ int specId,
+ String deleteFilePath) {
+ Table deletesTable =
+ MetadataTableUtils.createMetadataTableInstance(
+ testTable, MetadataTableType.POSITION_DELETES);
+ Types.StructType posDeleteSchema = deletesTable.schema().asStruct();
+ final Types.StructType finalSchema = posDeleteSchema;
+ StructLikeSet set = StructLikeSet.create(posDeleteSchema);
+ deletes.stream()
+ .map(
+ p -> {
+ GenericRecord record = GenericRecord.create(finalSchema);
+ record.setField("file_path", p.path());
+ record.setField("pos", p.pos());
+ record.setField("row", p.row());
+ if (partitionStruct != null) {
+ record.setField("partition", partitionStruct);
+ }
+ record.setField("spec_id", specId);
+ record.setField("delete_file_path", deleteFilePath);
+ return record;
+ })
+ .forEach(set::add);
+ return set;
+ }
+
+ private StructLikeSet expected(
+ Table testTable,
+ List<PositionDelete<?>> deletes,
+ StructLike partitionStruct,
+ String deleteFilePath) {
+ return expected(testTable, deletes, partitionStruct, testTable.spec().specId(), deleteFilePath);
+ }
+
+ private DataFile dataFile(Table tab, Object... partValues) throws IOException {
+ return dataFile(tab, partValues, partValues);
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private DataFile dataFile(Table tab, Object[] partDataValues, Object[] partFieldValues)
+ throws IOException {
+ GenericRecord record = GenericRecord.create(tab.schema());
+ List<String> partitionFieldNames =
+ tab.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList());
+ int idIndex = partitionFieldNames.indexOf("id");
+ int dataIndex = partitionFieldNames.indexOf("data");
+ Integer idPartition = idIndex != -1 ? (Integer) partDataValues[idIndex] : null;
+ String dataPartition = dataIndex != -1 ? (String) partDataValues[dataIndex] : null;
+
+ // fill columns with partition source fields, or preset values
+ List<Record> records =
+ Lists.newArrayList(
+ record.copy(
+ "id",
+ idPartition != null ? idPartition : 29,
+ "data",
+ dataPartition != null ? dataPartition : "c"),
+ record.copy(
+ "id",
+ idPartition != null ? idPartition : 43,
+ "data",
+ dataPartition != null ? dataPartition : "k"),
+ record.copy(
+ "id",
+ idPartition != null ? idPartition : 61,
+ "data",
+ dataPartition != null ? dataPartition : "r"),
+ record.copy(
+ "id",
+ idPartition != null ? idPartition : 89,
+ "data",
+ dataPartition != null ? dataPartition : "t"));
+
+ // fill remaining columns with incremental values
+ List<Types.NestedField> cols = tab.schema().columns();
+ if (cols.size() > 2) {
+ for (int i = 2; i < cols.size(); i++) {
+ final int pos = i;
+ records.forEach(r -> r.set(pos, pos));
+ }
+ }
+
+ TestHelpers.Row partitionInfo = TestHelpers.Row.of(partFieldValues);
+ return FileHelpers.writeDataFile(
+ tab, Files.localOutput(temp.newFile()), partitionInfo, records);
+ }
+
+ private Pair<List<PositionDelete<?>>, DeleteFile> deleteFile(
+ Table tab, DataFile dataFile, Object... partValues) throws IOException {
+ return deleteFile(tab, dataFile, partValues, partValues);
+ }
+
+ private Pair<List<PositionDelete<?>>, DeleteFile> deleteFile(
+ Table tab, DataFile dataFile, Object[] partDataValues, Object[] partFieldValues)
+ throws IOException {
+ List<PartitionField> partFields = tab.spec().fields();
+ List<String> partitionFieldNames =
+ partFields.stream().map(PartitionField::name).collect(Collectors.toList());
+ int idIndex = partitionFieldNames.indexOf("id");
+ int dataIndex = partitionFieldNames.indexOf("data");
+ Integer idPartition = idIndex != -1 ? (Integer) partDataValues[idIndex] : null;
+ String dataPartition = dataIndex != -1 ? (String) partDataValues[dataIndex] : null;
+
+ // fill columns with partition source fields, or preset values
+ List<PositionDelete<?>> deletes =
+ Lists.newArrayList(
+ positionDelete(
+ tab.schema(),
+ dataFile.path(),
+ 0L,
+ idPartition != null ? idPartition : 29,
+ dataPartition != null ? dataPartition : "c"),
+ positionDelete(
+ tab.schema(),
+ dataFile.path(),
+ 0L,
+ idPartition != null ? idPartition : 61,
+ dataPartition != null ? dataPartition : "r"));
+
+ // fill remaining columns with incremental values
+ List<Types.NestedField> cols = tab.schema().columns();
+ if (cols.size() > 2) {
+ for (int i = 2; i < cols.size(); i++) {
+ final int pos = i;
+ deletes.forEach(d -> d.get(2, GenericRecord.class).set(pos, pos));
+ }
+ }
+
+ TestHelpers.Row partitionInfo = TestHelpers.Row.of(partFieldValues);
+
+ DeleteFile deleteFile =
+ FileHelpers.writePosDeleteFile(
+ tab, Files.localOutput(temp.newFile()), partitionInfo, deletes);
+ return Pair.of(deletes, deleteFile);
+ }
+}