You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/04/05 17:27:34 UTC
[iceberg] branch master updated: Spark 3.3: Dataset writes for position deletes (#7029)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 22d29a5a9a Spark 3.3: Dataset writes for position deletes (#7029)
22d29a5a9a is described below
commit 22d29a5a9ae0d96139ceb0b5675981178fda210a
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Wed Apr 5 10:27:26 2023 -0700
Spark 3.3: Dataset writes for position deletes (#7029)
---
.../org/apache/iceberg/PositionDeletesTable.java | 10 +-
...inator.java => BaseFileRewriteCoordinator.java} | 49 +-
.../iceberg/spark/FileRewriteCoordinator.java | 66 +-
.../spark/PositionDeletesRewriteCoordinator.java | 33 +
.../spark/source/SparkPositionDeletesRewrite.java | 413 ++++++++++++
.../source/SparkPositionDeletesRewriteBuilder.java | 113 ++++
.../apache/iceberg/spark/source/SparkTable.java | 7 +-
.../spark/actions/TestRewriteDataFilesAction.java | 2 +-
.../spark/source/TestPositionDeletesTable.java | 695 ++++++++++++++++++++-
9 files changed, 1276 insertions(+), 112 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index 29228d136d..1983e0ddfc 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -41,6 +41,10 @@ import org.apache.iceberg.util.TableScanUtil;
*/
public class PositionDeletesTable extends BaseMetadataTable {
+ public static final String PARTITION = "partition";
+ public static final String SPEC_ID = "spec_id";
+ public static final String DELETE_FILE_PATH = "delete_file_path";
+
private final Schema schema;
private final int defaultSpecId;
private final Map<Integer, PartitionSpec> specs;
@@ -100,17 +104,17 @@ public class PositionDeletesTable extends BaseMetadataTable {
MetadataColumns.DELETE_FILE_ROW_DOC),
Types.NestedField.required(
MetadataColumns.PARTITION_COLUMN_ID,
- "partition",
+ PARTITION,
partitionType,
"Partition that position delete row belongs to"),
Types.NestedField.required(
MetadataColumns.SPEC_ID_COLUMN_ID,
- "spec_id",
+ SPEC_ID,
Types.IntegerType.get(),
MetadataColumns.SPEC_ID_COLUMN_DOC),
Types.NestedField.required(
MetadataColumns.FILE_PATH_COLUMN_ID,
- "delete_file_path",
+ DELETE_FILE_PATH,
Types.StringType.get(),
MetadataColumns.FILE_PATH_COLUMN_DOC));
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseFileRewriteCoordinator.java
similarity index 61%
copy from spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
copy to spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseFileRewriteCoordinator.java
index 210e861a4c..45c46f1a3e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseFileRewriteCoordinator.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.spark;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -31,62 +31,55 @@ import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FileRewriteCoordinator {
+abstract class BaseFileRewriteCoordinator<F extends ContentFile<F>> {
- private static final Logger LOG = LoggerFactory.getLogger(FileRewriteCoordinator.class);
- private static final FileRewriteCoordinator INSTANCE = new FileRewriteCoordinator();
+ private static final Logger LOG = LoggerFactory.getLogger(BaseFileRewriteCoordinator.class);
- private final Map<Pair<String, String>, Set<DataFile>> resultMap = Maps.newConcurrentMap();
-
- private FileRewriteCoordinator() {}
-
- public static FileRewriteCoordinator get() {
- return INSTANCE;
- }
+ private final Map<Pair<String, String>, Set<F>> resultMap = Maps.newConcurrentMap();
/**
* Called to persist the output of a rewrite action for a specific group. Since the write is done
* via a Spark Datasource, we have to propagate the result through this side-effect call.
*
* @param table table where the rewrite is occurring
- * @param fileSetID the id used to identify the source set of files being rewritten
- * @param newDataFiles the new files which have been written
+ * @param fileSetId the id used to identify the source set of files being rewritten
+ * @param newFiles the new files which have been written
*/
- public void stageRewrite(Table table, String fileSetID, Set<DataFile> newDataFiles) {
+ public void stageRewrite(Table table, String fileSetId, Set<F> newFiles) {
LOG.debug(
"Staging the output for {} - fileset {} with {} files",
table.name(),
- fileSetID,
- newDataFiles.size());
- Pair<String, String> id = toID(table, fileSetID);
- resultMap.put(id, newDataFiles);
+ fileSetId,
+ newFiles.size());
+ Pair<String, String> id = toId(table, fileSetId);
+ resultMap.put(id, newFiles);
}
- public Set<DataFile> fetchNewDataFiles(Table table, String fileSetID) {
- Pair<String, String> id = toID(table, fileSetID);
- Set<DataFile> result = resultMap.get(id);
+ public Set<F> fetchNewFiles(Table table, String fileSetId) {
+ Pair<String, String> id = toId(table, fileSetId);
+ Set<F> result = resultMap.get(id);
ValidationException.check(
- result != null, "No results for rewrite of file set %s in table %s", fileSetID, table);
+ result != null, "No results for rewrite of file set %s in table %s", fileSetId, table);
return result;
}
- public void clearRewrite(Table table, String fileSetID) {
- LOG.debug("Removing entry from RewriteCoordinator for {} - id {}", table.name(), fileSetID);
- Pair<String, String> id = toID(table, fileSetID);
+ public void clearRewrite(Table table, String fileSetId) {
+ LOG.debug("Removing entry for {} - id {}", table.name(), fileSetId);
+ Pair<String, String> id = toId(table, fileSetId);
resultMap.remove(id);
}
- public Set<String> fetchSetIDs(Table table) {
+ public Set<String> fetchSetIds(Table table) {
return resultMap.keySet().stream()
.filter(e -> e.first().equals(tableUUID(table)))
.map(Pair::second)
.collect(Collectors.toSet());
}
- private Pair<String, String> toID(Table table, String setID) {
+ private Pair<String, String> toId(Table table, String setId) {
String tableUUID = tableUUID(table);
- return Pair.of(tableUUID, setID);
+ return Pair.of(tableUUID, setId);
}
private String tableUUID(Table table) {
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
index 210e861a4c..4f1d0fffcb 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
@@ -18,79 +18,29 @@
*/
package org.apache.iceberg.spark;
-import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
-import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class FileRewriteCoordinator {
+public class FileRewriteCoordinator extends BaseFileRewriteCoordinator<DataFile> {
- private static final Logger LOG = LoggerFactory.getLogger(FileRewriteCoordinator.class);
private static final FileRewriteCoordinator INSTANCE = new FileRewriteCoordinator();
- private final Map<Pair<String, String>, Set<DataFile>> resultMap = Maps.newConcurrentMap();
-
private FileRewriteCoordinator() {}
public static FileRewriteCoordinator get() {
return INSTANCE;
}
- /**
- * Called to persist the output of a rewrite action for a specific group. Since the write is done
- * via a Spark Datasource, we have to propagate the result through this side-effect call.
- *
- * @param table table where the rewrite is occurring
- * @param fileSetID the id used to identify the source set of files being rewritten
- * @param newDataFiles the new files which have been written
- */
- public void stageRewrite(Table table, String fileSetID, Set<DataFile> newDataFiles) {
- LOG.debug(
- "Staging the output for {} - fileset {} with {} files",
- table.name(),
- fileSetID,
- newDataFiles.size());
- Pair<String, String> id = toID(table, fileSetID);
- resultMap.put(id, newDataFiles);
- }
-
- public Set<DataFile> fetchNewDataFiles(Table table, String fileSetID) {
- Pair<String, String> id = toID(table, fileSetID);
- Set<DataFile> result = resultMap.get(id);
- ValidationException.check(
- result != null, "No results for rewrite of file set %s in table %s", fileSetID, table);
-
- return result;
- }
-
- public void clearRewrite(Table table, String fileSetID) {
- LOG.debug("Removing entry from RewriteCoordinator for {} - id {}", table.name(), fileSetID);
- Pair<String, String> id = toID(table, fileSetID);
- resultMap.remove(id);
+ /** @deprecated will be removed in 1.4.0; use {@link #fetchNewFiles(Table, String)} instead. */
+ @Deprecated
+ public Set<DataFile> fetchNewDataFiles(Table table, String fileSetId) {
+ return fetchNewFiles(table, fileSetId);
}
+ /** @deprecated will be removed in 1.4.0; use {@link #fetchSetIds(Table)} instead */
+ @Deprecated
public Set<String> fetchSetIDs(Table table) {
- return resultMap.keySet().stream()
- .filter(e -> e.first().equals(tableUUID(table)))
- .map(Pair::second)
- .collect(Collectors.toSet());
- }
-
- private Pair<String, String> toID(Table table, String setID) {
- String tableUUID = tableUUID(table);
- return Pair.of(tableUUID, setID);
- }
-
- private String tableUUID(Table table) {
- TableOperations ops = ((HasTableOperations) table).operations();
- return ops.current().uuid();
+ return fetchSetIds(table);
}
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/PositionDeletesRewriteCoordinator.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/PositionDeletesRewriteCoordinator.java
new file mode 100644
index 0000000000..c7568005e2
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/PositionDeletesRewriteCoordinator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.DeleteFile;
+
+public class PositionDeletesRewriteCoordinator extends BaseFileRewriteCoordinator<DeleteFile> {
+
+ private static final PositionDeletesRewriteCoordinator INSTANCE =
+ new PositionDeletesRewriteCoordinator();
+
+ private PositionDeletesRewriteCoordinator() {}
+
+ public static PositionDeletesRewriteCoordinator get() {
+ return INSTANCE;
+ }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
new file mode 100644
index 0000000000..0aebb6bdb2
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
+import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * {@link Write} class for rewriting position delete files from Spark. Responsible for creating
+ * {@link PositionDeleteBatchWrite}.
+ *
+ * <p>This class is meant to be used for an action to rewrite position delete files. Hence, it
+ * assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all
+ * have the same partition spec id and partition values.
+ */
+public class SparkPositionDeletesRewrite implements Write {
+
+ private final JavaSparkContext sparkContext;
+ private final Table table;
+ private final String queryId;
+ private final FileFormat format;
+ private final long targetFileSize;
+ private final Schema writeSchema;
+ private final StructType dsSchema;
+ private final String fileSetId;
+ private final int specId;
+ private final StructLike partition;
+
+ /**
+ * Constructs a {@link SparkPositionDeletesRewrite}.
+ *
+ * @param spark Spark session
+ * @param table instance of {@link PositionDeletesTable}
+ * @param writeConf Spark write config
+ * @param writeInfo Spark write info
+ * @param writeSchema Iceberg output schema
+ * @param dsSchema schema of original incoming position deletes dataset
+ * @param specId spec id of position deletes
+ * @param partition partition value of position deletes
+ */
+ SparkPositionDeletesRewrite(
+ SparkSession spark,
+ Table table,
+ SparkWriteConf writeConf,
+ LogicalWriteInfo writeInfo,
+ Schema writeSchema,
+ StructType dsSchema,
+ int specId,
+ StructLike partition) {
+ this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ this.table = table;
+ this.queryId = writeInfo.queryId();
+ this.format = writeConf.deleteFileFormat();
+ this.targetFileSize = writeConf.targetDeleteFileSize();
+ this.writeSchema = writeSchema;
+ this.dsSchema = dsSchema;
+ this.fileSetId = writeConf.rewrittenFileSetId();
+ this.specId = specId;
+ this.partition = partition;
+ }
+
+ @Override
+ public BatchWrite toBatch() {
+ return new PositionDeleteBatchWrite();
+ }
+
+ /** {@link BatchWrite} class for rewriting position deletes files from Spark */
+ class PositionDeleteBatchWrite implements BatchWrite {
+
+ @Override
+ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+ // broadcast the table metadata as the writer factory will be sent to executors
+ Broadcast<Table> tableBroadcast =
+ sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ return new PositionDeletesWriterFactory(
+ tableBroadcast,
+ queryId,
+ format,
+ targetFileSize,
+ writeSchema,
+ dsSchema,
+ specId,
+ partition);
+ }
+
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get();
+ coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages)));
+ }
+
+ @Override
+ public void abort(WriterCommitMessage[] messages) {
+ SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
+ }
+
+ private List<DeleteFile> files(WriterCommitMessage[] messages) {
+ List<DeleteFile> files = Lists.newArrayList();
+
+ for (WriterCommitMessage message : messages) {
+ if (message != null) {
+ DeleteTaskCommit taskCommit = (DeleteTaskCommit) message;
+ files.addAll(Arrays.asList(taskCommit.files()));
+ }
+ }
+
+ return files;
+ }
+ }
+
+ /**
+ * Writer factory for position deletes metadata table. Responsible for creating {@link
+ * DeleteWriter}.
+ *
+ * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an
+ * assumption that all incoming deletes belong to the same partition, and that incoming dataset is
+ * from {@link ScanTaskSetManager}.
+ */
+ static class PositionDeletesWriterFactory implements DataWriterFactory {
+ private final Broadcast<Table> tableBroadcast;
+ private final String queryId;
+ private final FileFormat format;
+ private final Long targetFileSize;
+ private final Schema writeSchema;
+ private final StructType dsSchema;
+ private final int specId;
+ private final StructLike partition;
+
+ PositionDeletesWriterFactory(
+ Broadcast<Table> tableBroadcast,
+ String queryId,
+ FileFormat format,
+ long targetFileSize,
+ Schema writeSchema,
+ StructType dsSchema,
+ int specId,
+ StructLike partition) {
+ this.tableBroadcast = tableBroadcast;
+ this.queryId = queryId;
+ this.format = format;
+ this.targetFileSize = targetFileSize;
+ this.writeSchema = writeSchema;
+ this.dsSchema = dsSchema;
+ this.specId = specId;
+ this.partition = partition;
+ }
+
+ @Override
+ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
+ Table table = tableBroadcast.value();
+
+ OutputFileFactory deleteFileFactory =
+ OutputFileFactory.builderFor(table, partitionId, taskId)
+ .format(format)
+ .operationId(queryId)
+ .suffix("deletes")
+ .build();
+
+ Schema positionDeleteRowSchema = positionDeleteRowSchema();
+ StructType deleteSparkType = deleteSparkType();
+ StructType deleteSparkTypeWithoutRow = deleteSparkTypeWithoutRow();
+
+ SparkFileWriterFactory writerFactoryWithRow =
+ SparkFileWriterFactory.builderFor(table)
+ .deleteFileFormat(format)
+ .positionDeleteRowSchema(positionDeleteRowSchema)
+ .positionDeleteSparkType(deleteSparkType)
+ .build();
+ SparkFileWriterFactory writerFactoryWithoutRow =
+ SparkFileWriterFactory.builderFor(table)
+ .deleteFileFormat(format)
+ .positionDeleteSparkType(deleteSparkTypeWithoutRow)
+ .build();
+
+ return new DeleteWriter(
+ table,
+ writerFactoryWithRow,
+ writerFactoryWithoutRow,
+ deleteFileFactory,
+ targetFileSize,
+ dsSchema,
+ specId,
+ partition);
+ }
+
+ private Schema positionDeleteRowSchema() {
+ return new Schema(
+ writeSchema
+ .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+ .type()
+ .asStructType()
+ .fields());
+ }
+
+ private StructType deleteSparkType() {
+ return new StructType(
+ new StructField[] {
+ dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()),
+ dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()),
+ dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+ });
+ }
+
+ private StructType deleteSparkTypeWithoutRow() {
+ return new StructType(
+ new StructField[] {
+ dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()),
+ dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()),
+ });
+ }
+ }
+
+ /**
+ * Writer for position deletes metadata table.
+ *
+ * <p>Iceberg specifies delete files schema as having either 'row' as a required field, or omits
+ * 'row' altogether. This is to ensure accuracy of delete file statistics on 'row' column. Hence,
+ * this writer, if receiving source position deletes with null and non-null rows, redirects rows
+ * with null 'row' to one file writer, and non-null 'row' to another file writer.
+ *
+ * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an
+ * assumption that all incoming deletes belong to the same partition.
+ */
+ private static class DeleteWriter implements DataWriter<InternalRow> {
+ private final SparkFileWriterFactory writerFactoryWithRow;
+ private final SparkFileWriterFactory writerFactoryWithoutRow;
+ private final OutputFileFactory deleteFileFactory;
+ private final long targetFileSize;
+ private final PositionDelete<InternalRow> positionDelete;
+ private final FileIO io;
+ private final PartitionSpec spec;
+ private final int fileOrdinal;
+ private final int positionOrdinal;
+ private final int rowOrdinal;
+ private final int rowSize;
+ private final StructLike partition;
+
+ private ClusteredPositionDeleteWriter<InternalRow> writerWithRow;
+ private ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow;
+ private boolean closed = false;
+
+ /**
+ * Constructs a {@link DeleteWriter}.
+ *
+ * @param table position deletes metadata table
+ * @param writerFactoryWithRow writer factory for deletes with non-null 'row'
+ * @param writerFactoryWithoutRow writer factory for deletes with null 'row'
+ * @param deleteFileFactory delete file factory
+ * @param targetFileSize target file size
+ * @param dsSchema schema of incoming dataset of position deletes
+ * @param specId partition spec id of incoming position deletes. All incoming partition deletes
+ * are required to have the same spec id.
+ * @param partition partition value of incoming position delete. All incoming partition deletes
+ * are required to have the same partition.
+ */
+ DeleteWriter(
+ Table table,
+ SparkFileWriterFactory writerFactoryWithRow,
+ SparkFileWriterFactory writerFactoryWithoutRow,
+ OutputFileFactory deleteFileFactory,
+ long targetFileSize,
+ StructType dsSchema,
+ int specId,
+ StructLike partition) {
+ this.deleteFileFactory = deleteFileFactory;
+ this.targetFileSize = targetFileSize;
+ this.writerFactoryWithRow = writerFactoryWithRow;
+ this.writerFactoryWithoutRow = writerFactoryWithoutRow;
+ this.positionDelete = PositionDelete.create();
+ this.io = table.io();
+ this.spec = table.specs().get(specId);
+ this.partition = partition;
+
+ this.fileOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_PATH.name());
+ this.positionOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_POS.name());
+
+ this.rowOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME);
+ DataType type = dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME).dataType();
+ Preconditions.checkArgument(
+ type instanceof StructType, "Expected row as struct type but was %s", type);
+ this.rowSize = ((StructType) type).size();
+ }
+
+ @Override
+ public void write(InternalRow record) throws IOException {
+ String file = record.getString(fileOrdinal);
+ long position = record.getLong(positionOrdinal);
+ InternalRow row = record.getStruct(rowOrdinal, rowSize);
+ if (row != null) {
+ positionDelete.set(file, position, row);
+ lazyWriterWithRow().write(positionDelete, spec, partition);
+ } else {
+ positionDelete.set(file, position, null);
+ lazyWriterWithoutRow().write(positionDelete, spec, partition);
+ }
+ }
+
+ @Override
+ public WriterCommitMessage commit() throws IOException {
+ close();
+ return new DeleteTaskCommit(allDeleteFiles());
+ }
+
+ @Override
+ public void abort() throws IOException {
+ close();
+ SparkCleanupUtil.deleteTaskFiles(io, allDeleteFiles());
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ if (writerWithRow != null) {
+ writerWithRow.close();
+ }
+ if (writerWithoutRow != null) {
+ writerWithoutRow.close();
+ }
+ this.closed = true;
+ }
+ }
+
+ private ClusteredPositionDeleteWriter<InternalRow> lazyWriterWithRow() {
+ if (writerWithRow == null) {
+ this.writerWithRow =
+ new ClusteredPositionDeleteWriter<>(
+ writerFactoryWithRow, deleteFileFactory, io, targetFileSize);
+ }
+ return writerWithRow;
+ }
+
+ private ClusteredPositionDeleteWriter<InternalRow> lazyWriterWithoutRow() {
+ if (writerWithoutRow == null) {
+ this.writerWithoutRow =
+ new ClusteredPositionDeleteWriter<>(
+ writerFactoryWithoutRow, deleteFileFactory, io, targetFileSize);
+ }
+ return writerWithoutRow;
+ }
+
+ private List<DeleteFile> allDeleteFiles() {
+ List<DeleteFile> allDeleteFiles = Lists.newArrayList();
+ if (writerWithRow != null) {
+ allDeleteFiles.addAll(writerWithRow.result().deleteFiles());
+ }
+ if (writerWithoutRow != null) {
+ allDeleteFiles.addAll(writerWithoutRow.result().deleteFiles());
+ }
+ return allDeleteFiles;
+ }
+ }
+
+ public static class DeleteTaskCommit implements WriterCommitMessage {
+ private final DeleteFile[] taskFiles;
+
+ DeleteTaskCommit(List<DeleteFile> deleteFiles) {
+ this.taskFiles = deleteFiles.toArray(new DeleteFile[0]);
+ }
+
+ DeleteFile[] files() {
+ return taskFiles;
+ }
+ }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
new file mode 100644
index 0000000000..cc5c987fc4
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link
+ * SparkPositionDeletesRewrite}.
+ *
+ * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an
+ * assumption that all incoming deletes belong to the same partition, and that incoming dataset is
+ * from {@link ScanTaskSetManager}.
+ */
+public class SparkPositionDeletesRewriteBuilder implements WriteBuilder {
+
+ private final SparkSession spark;
+ private final Table table;
+ private final SparkWriteConf writeConf;
+ private final LogicalWriteInfo writeInfo;
+ private final StructType dsSchema;
+ private final Schema writeSchema;
+
+ SparkPositionDeletesRewriteBuilder(
+ SparkSession spark, Table table, String branch, LogicalWriteInfo info) {
+ this.spark = spark;
+ this.table = table;
+ this.writeConf = new SparkWriteConf(spark, table, branch, info.options());
+ this.writeInfo = info;
+ this.dsSchema = info.schema();
+ this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive());
+ }
+
+ @Override
+ public Write build() {
+ String fileSetId = writeConf.rewrittenFileSetId();
+ boolean handleTimestampWithoutZone = writeConf.handleTimestampWithoutZone();
+
+ Preconditions.checkArgument(
+ fileSetId != null, "Can only write to %s via actions", table.name());
+ Preconditions.checkArgument(
+ handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
+ SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
+
+ // all files of rewrite group have same partition and spec id
+ ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
+ List<PositionDeletesScanTask> tasks = taskSetManager.fetchTasks(table, fileSetId);
+ Preconditions.checkArgument(
+ tasks != null && tasks.size() > 0, "No scan tasks found for %s", fileSetId);
+
+ int specId = specId(fileSetId, tasks);
+ StructLike partition = partition(fileSetId, tasks);
+
+ return new SparkPositionDeletesRewrite(
+ spark, table, writeConf, writeInfo, writeSchema, dsSchema, specId, partition);
+ }
+
+ private int specId(String fileSetId, List<PositionDeletesScanTask> tasks) {
+ Set<Integer> specIds = tasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet());
+ Preconditions.checkArgument(
+ specIds.size() == 1,
+ "All scan tasks of %s are expected to have same spec id, but got %s",
+ fileSetId,
+ Joiner.on(",").join(specIds));
+ return tasks.get(0).spec().specId();
+ }
+
+ private StructLike partition(String fileSetId, List<PositionDeletesScanTask> tasks) {
+ StructLikeSet partitions = StructLikeSet.create(tasks.get(0).spec().partitionType());
+ tasks.stream().map(ContentScanTask::partition).forEach(partitions::add);
+ Preconditions.checkArgument(
+ partitions.size() == 1,
+ "All scan tasks of %s are expected to have the same partition, but got %s",
+ fileSetId,
+ Joiner.on(",").join(partitions));
+ return tasks.get(0).partition();
+ }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index f143071572..d845284513 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
@@ -279,7 +280,11 @@ public class SparkTable
Preconditions.checkArgument(
snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId);
- return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info);
+ if (icebergTable instanceof PositionDeletesTable) {
+ return new SparkPositionDeletesRewriteBuilder(sparkSession(), icebergTable, branch, info);
+ } else {
+ return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info);
+ }
}
@Override
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index b6b8b5b09d..bb062a5ba4 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -1679,7 +1679,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
private Set<String> cacheContents(Table table) {
return ImmutableSet.<String>builder()
.addAll(manager.fetchSetIds(table))
- .addAll(coordinator.fetchSetIDs(table))
+ .addAll(coordinator.fetchSetIds(table))
.build();
}
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 2be5c8444d..2ec4f2f4f9 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -29,11 +30,13 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
@@ -45,20 +48,30 @@ import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkStructLike;
-import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.functions;
import org.junit.Assert;
import org.junit.Rule;
@@ -68,20 +81,51 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
-public class TestPositionDeletesTable extends SparkTestBase {
+public class TestPositionDeletesTable extends SparkCatalogTestBase {
public static final Schema SCHEMA =
new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "data", Types.StringType.get()));
+ private static final Map<String, String> CATALOG_PROPS =
+ ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default",
+ "cache-enabled", "false");
+ private static final List<String> NON_PATH_COLS =
+ ImmutableList.of("file_path", "pos", "row", "partition", "spec_id");
+
private final FileFormat format;
- @Parameterized.Parameters(name = "fileFormat = {0}")
+ @Parameterized.Parameters(
+ name =
+ "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}")
public static Object[][] parameters() {
- return new Object[][] {{FileFormat.PARQUET}, {FileFormat.AVRO}, {FileFormat.ORC}};
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ CATALOG_PROPS,
+ FileFormat.PARQUET
+ },
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ CATALOG_PROPS,
+ FileFormat.AVRO
+ },
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ CATALOG_PROPS,
+ FileFormat.ORC
+ },
+ };
}
- public TestPositionDeletesTable(FileFormat format) {
+ public TestPositionDeletesTable(
+ String catalogName, String implementation, Map<String, String> config, FileFormat format) {
+ super(catalogName, implementation, config);
this.format = format;
}
@@ -678,8 +722,8 @@ public class TestPositionDeletesTable extends SparkTestBase {
}
@Test
- public void testScanTaskSetManager() throws IOException {
- String tableName = "scan_task_set_manager";
+ public void testWrite() throws IOException, NoSuchTableException {
+ String tableName = "test_write";
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
Table tab = createTable(tableName, SCHEMA, spec);
@@ -689,19 +733,422 @@ public class TestPositionDeletesTable extends SparkTestBase {
// Add position deletes for both partitions
Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
- Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ Table posDeletesTable =
+ MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+ String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+ for (String partValue : ImmutableList.of("a", "b")) {
+ try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", partValue)) {
+ String fileSetID = UUID.randomUUID().toString();
+ stageTask(tab, fileSetID, tasks);
+
+ Dataset<Row> scanDF =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+ .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+ .load(posDeletesTableName);
+
+ Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+ scanDF
+ .writeTo(posDeletesTableName)
+ .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+ .append();
+
+ commit(tab, posDeletesTable, fileSetID, 1);
+ }
+ }
+
+ // Prepare expected values (without 'delete_file_path' as these have been rewritten)
+ GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType());
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ Record partitionB = partitionRecordTemplate.copy("data", "b");
+ StructLikeSet expectedA = expected(tab, deletesA.first(), partitionA, null);
+ StructLikeSet expectedB = expected(tab, deletesB.first(), partitionB, null);
+ StructLikeSet allExpected =
+ StructLikeSet.create(
+ TypeUtil.selectNot(
+ posDeletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID))
+ .asStruct());
+ allExpected.addAll(expectedA);
+ allExpected.addAll(expectedB);
+
+ // Compare values without 'delete_file_path' as these have been rewritten
+ StructLikeSet actual = actual(tableName, tab, null, NON_PATH_COLS);
+ Assert.assertEquals("Position Delete table should contain expected rows", allExpected, actual);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testWriteUnpartitionedNullRows() throws Exception {
+ String tableName = "write_null_rows";
+ Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+
+ DataFile dFile = dataFile(tab);
+ tab.newAppend().appendFile(dFile).commit();
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
+ deletes.add(Pair.of(dFile.path(), 0L));
+ deletes.add(Pair.of(dFile.path(), 1L));
+ Pair<DeleteFile, CharSequenceSet> posDeletes =
+ FileHelpers.writeDeleteFile(
+ tab, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);
+ tab.newRowDelta().addDeletes(posDeletes.first()).commit();
+
+ Table posDeletesTable =
+ MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+ String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+ try (CloseableIterable<ScanTask> tasks = posDeletesTable.newBatchScan().planFiles()) {
+ String fileSetID = UUID.randomUUID().toString();
+ stageTask(tab, fileSetID, tasks);
+
+ Dataset<Row> scanDF =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+ .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+ .load(posDeletesTableName);
+ Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+ scanDF
+ .writeTo(posDeletesTableName)
+ .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+ .append();
+
+ commit(tab, posDeletesTable, fileSetID, 1);
+ }
+
+ // Compare values without 'delete_file_path' as these have been rewritten
+ StructLikeSet actual =
+ actual(tableName, tab, null, ImmutableList.of("file_path", "pos", "row", "spec_id"));
+
+ List<PositionDelete<?>> expectedDeletes =
+ Lists.newArrayList(positionDelete(dFile.path(), 0L), positionDelete(dFile.path(), 1L));
+ StructLikeSet expected = expected(tab, expectedDeletes, null, null);
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expected, actual);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testWriteMixedRows() throws Exception {
+ String tableName = "write_mixed_rows";
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table tab = createTable(tableName, SCHEMA, spec);
+
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ // Add a delete file with row and without row
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
+ deletes.add(Pair.of(dataFileA.path(), 0L));
+ deletes.add(Pair.of(dataFileA.path(), 1L));
+ Pair<DeleteFile, CharSequenceSet> deletesWithoutRow =
+ FileHelpers.writeDeleteFile(
+ tab, Files.localOutput(temp.newFile()), TestHelpers.Row.of("a"), deletes);
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesWithRow = deleteFile(tab, dataFileB, "b");
+
+ tab.newRowDelta()
+ .addDeletes(deletesWithoutRow.first())
+ .addDeletes(deletesWithRow.second())
+ .commit();
+
+ // rewrite delete files
+ Table posDeletesTable =
+ MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+ String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+ for (String partValue : ImmutableList.of("a", "b")) {
+ try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", partValue)) {
+ String fileSetID = UUID.randomUUID().toString();
+ stageTask(tab, fileSetID, tasks);
+
+ Dataset<Row> scanDF =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+ .load(posDeletesTableName);
+ Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+ scanDF
+ .writeTo(posDeletesTableName)
+ .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+ .append();
+
+ commit(tab, posDeletesTable, fileSetID, 1);
+ }
+ }
+
+ // Compare values without 'delete_file_path' as these have been rewritten
+ StructLikeSet actual =
+ actual(
+ tableName,
+ tab,
+ null,
+ ImmutableList.of("file_path", "pos", "row", "partition", "spec_id"));
+
+ // Prepare expected values
+ GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType());
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ Record partitionB = partitionRecordTemplate.copy("data", "b");
+ StructLikeSet allExpected =
+ StructLikeSet.create(
+ TypeUtil.selectNot(
+ posDeletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID))
+ .asStruct());
+ allExpected.addAll(
+ expected(
+ tab,
+ Lists.newArrayList(
+ positionDelete(dataFileA.path(), 0L), positionDelete(dataFileA.path(), 1L)),
+ partitionA,
+ null));
+ allExpected.addAll(expected(tab, deletesWithRow.first(), partitionB, null));
+
+ Assert.assertEquals("Position Delete table should contain expected rows", allExpected, actual);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testWritePartitionEvolutionAdd() throws Exception {
+ // Create unpartitioned table
+ String tableName = "write_partition_evolution_add";
+ Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+ int specId0 = tab.spec().specId();
+
+ // Add files with unpartitioned spec
+ DataFile dataFileUnpartitioned = dataFile(tab);
+ tab.newAppend().appendFile(dataFileUnpartitioned).commit();
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned =
+ deleteFile(tab, dataFileUnpartitioned);
+ tab.newRowDelta().addDeletes(deletesUnpartitioned.second()).commit();
+
+ // Switch partition spec to (data)
+ tab.updateSpec().addField("data").commit();
+ int specId1 = tab.spec().specId();
+
+ // Add files with new spec (data)
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
Table posDeletesTable =
MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
- String posDeletesTableName = "default." + tableName + ".position_deletes";
+ String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+
+ // Read/write back unpartitioned data
+ try (CloseableIterable<ScanTask> tasks =
+ posDeletesTable.newBatchScan().filter(Expressions.isNull("partition.data")).planFiles()) {
+ String fileSetID = UUID.randomUUID().toString();
+ stageTask(tab, fileSetID, tasks);
+
+ Dataset<Row> scanDF =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+ .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+ .load(posDeletesTableName);
+ Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+ scanDF
+ .writeTo(posDeletesTableName)
+ .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+ .append();
+
+ commit(tab, posDeletesTable, fileSetID, 1);
+ }
+
+ // Select deletes from unpartitioned data
+ // Compare values without 'delete_file_path' as these have been rewritten
+ GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+ Record unpartitionedRecord = partitionRecordTemplate.copy("data", null);
+ StructLikeSet expectedUnpartitioned =
+ expected(tab, deletesUnpartitioned.first(), unpartitionedRecord, specId0, null);
+ StructLikeSet actualUnpartitioned =
+ actual(tableName, tab, "partition.data IS NULL", NON_PATH_COLS);
+ Assert.assertEquals(
+ "Position Delete table should contain expected rows",
+ expectedUnpartitioned,
+ actualUnpartitioned);
+
+ // Read/write back new partition spec (data)
+ for (String partValue : ImmutableList.of("a", "b")) {
+ try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", partValue)) {
+ String fileSetID = UUID.randomUUID().toString();
+ stageTask(tab, fileSetID, tasks);
+
+ Dataset<Row> scanDF =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+ .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+ .load(posDeletesTableName);
+ Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+ scanDF
+ .writeTo(posDeletesTableName)
+ .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+ .append();
+
+ // commit the rewrite
+ commit(tab, posDeletesTable, fileSetID, 1);
+ }
+ }
+
+ // Select deletes from new spec (data)
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ Record partitionB = partitionRecordTemplate.copy("data", "b");
+ StructLikeSet expectedAll =
+ StructLikeSet.create(
+ TypeUtil.selectNot(
+ posDeletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID))
+ .asStruct());
+ expectedAll.addAll(expected(tab, deletesA.first(), partitionA, specId1, null));
+ expectedAll.addAll(expected(tab, deletesB.first(), partitionB, specId1, null));
+ StructLikeSet actualAll =
+ actual(tableName, tab, "partition.data = 'a' OR partition.data = 'b'", NON_PATH_COLS);
+ Assert.assertEquals(
+ "Position Delete table should contain expected rows", expectedAll, actualAll);
+
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testWritePartitionEvolutionDisallowed() throws Exception {
+ // Create unpartitioned table
+ String tableName = "write_partition_evolution_write";
+ Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+
+ // Add files with unpartitioned spec
+ DataFile dataFileUnpartitioned = dataFile(tab);
+ tab.newAppend().appendFile(dataFileUnpartitioned).commit();
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned =
+ deleteFile(tab, dataFileUnpartitioned);
+ tab.newRowDelta().addDeletes(deletesUnpartitioned.second()).commit();
+
+ Table posDeletesTable =
+ MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+ String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+
+ Dataset<Row> scanDF;
+ String fileSetID = UUID.randomUUID().toString();
try (CloseableIterable<ScanTask> tasks = posDeletesTable.newBatchScan().planFiles()) {
+ stageTask(tab, fileSetID, tasks);
+
+ scanDF =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+ .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+ .load(posDeletesTableName);
+ Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+
+ // Add partition field to render the original un-partitioned dataset un-commitable
+ tab.updateSpec().addField("data").commit();
+ }
+
+ Assert.assertThrows(
+ AnalysisException.class,
+ () ->
+ scanDF
+ .writeTo(posDeletesTableName)
+ .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+ .append());
+
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testWriteSchemaEvolutionAdd() throws Exception {
+ // Create table with original schema
+ String tableName = "write_schema_evolution_add";
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table tab = createTable(tableName, SCHEMA, spec);
+
+ // Add files with original schema
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Add files with new schema
+ tab.updateSchema()
+ .addColumn("new_col_1", Types.IntegerType.get())
+ .addColumn("new_col_2", Types.IntegerType.get())
+ .commit();
+
+ // Add files with new schema
+ DataFile dataFileC = dataFile(tab, "c");
+ DataFile dataFileD = dataFile(tab, "d");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesC = deleteFile(tab, dataFileC, "c");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesD = deleteFile(tab, dataFileD, "d");
+ tab.newRowDelta().addDeletes(deletesC.second()).addDeletes(deletesD.second()).commit();
+
+ Table posDeletesTable =
+ MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+ String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+
+ // rewrite files of old schema
+ try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", "a")) {
String fileSetID = UUID.randomUUID().toString();
+ stageTask(tab, fileSetID, tasks);
+
+ Dataset<Row> scanDF =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+ .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+ .load(posDeletesTableName);
+
+ Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+ scanDF
+ .writeTo(posDeletesTableName)
+ .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+ .append();
+
+ commit(tab, posDeletesTable, fileSetID, 1);
+ }
+
+ // Select deletes from old schema
+ GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ // pad expected delete rows with null values for new columns
+ List<PositionDelete<?>> expectedDeletesA = deletesA.first();
+ expectedDeletesA.forEach(
+ d -> {
+ GenericRecord nested = d.get(2, GenericRecord.class);
+ GenericRecord padded = GenericRecord.create(tab.schema().asStruct());
+ padded.set(0, nested.get(0));
+ padded.set(1, nested.get(1));
+ padded.set(2, null);
+ padded.set(3, null);
+ d.set(2, padded);
+ });
+ StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, null);
+ StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a'", NON_PATH_COLS);
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
- ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
- taskSetManager.stageTasks(tab, fileSetID, Lists.newArrayList(tasks));
+ // rewrite files of new schema
+ try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", "c")) {
+ String fileSetID = UUID.randomUUID().toString();
+ stageTask(tab, fileSetID, tasks);
- // read and pack original 4 files into 2 splits
Dataset<Row> scanDF =
spark
.read()
@@ -710,30 +1157,171 @@ public class TestPositionDeletesTable extends SparkTestBase {
.option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
.load(posDeletesTableName);
- Assert.assertEquals("Num partitions should match", 2, scanDF.javaRDD().getNumPartitions());
+ Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+ scanDF
+ .writeTo(posDeletesTableName)
+ .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+ .append();
+
+ commit(tab, posDeletesTable, fileSetID, 1);
+ }
+
+ // Select deletes from new schema
+ Record partitionC = partitionRecordTemplate.copy("data", "c");
+ StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, null);
+ StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c'", NON_PATH_COLS);
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedC, actualC);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testWriteSchemaEvolutionRemove() throws Exception {
+ // Create table with original schema
+ String tableName = "write_schema_evolution_remove";
+ Schema oldSchema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "new_col_1", Types.IntegerType.get()),
+ Types.NestedField.optional(4, "new_col_2", Types.IntegerType.get()));
+ PartitionSpec spec = PartitionSpec.builderFor(oldSchema).identity("data").build();
+ Table tab = createTable(tableName, oldSchema, spec);
+
+ // Add files with original schema
+ DataFile dataFileA = dataFile(tab, "a");
+ DataFile dataFileB = dataFile(tab, "b");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+ tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+ // Add files with new schema
+ tab.updateSchema().deleteColumn("new_col_1").deleteColumn("new_col_2").commit();
+
+ // Add files with new schema
+ DataFile dataFileC = dataFile(tab, "c");
+ DataFile dataFileD = dataFile(tab, "d");
+ tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesC = deleteFile(tab, dataFileC, "c");
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesD = deleteFile(tab, dataFileD, "d");
+ tab.newRowDelta().addDeletes(deletesC.second()).addDeletes(deletesD.second()).commit();
- dropTable(tableName);
+ Table posDeletesTable =
+ MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+ String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+
+ // rewrite files
+ for (String partValue : ImmutableList.of("a", "b", "c", "d")) {
+ try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", partValue)) {
+ String fileSetID = UUID.randomUUID().toString();
+ stageTask(tab, fileSetID, tasks);
+
+ Dataset<Row> scanDF =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+ .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+ .load(posDeletesTableName);
+ Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+ scanDF
+ .writeTo(posDeletesTableName)
+ .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+ .append();
+
+ commit(tab, posDeletesTable, fileSetID, 1);
+ }
}
+
+ // Select deletes from old schema
+ GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+ Record partitionA = partitionRecordTemplate.copy("data", "a");
+ // remove deleted columns from expected result
+ List<PositionDelete<?>> expectedDeletesA = deletesA.first();
+ expectedDeletesA.forEach(
+ d -> {
+ GenericRecord nested = d.get(2, GenericRecord.class);
+ GenericRecord padded = GenericRecord.create(tab.schema().asStruct());
+ padded.set(0, nested.get(0));
+ padded.set(1, nested.get(1));
+ d.set(2, padded);
+ });
+ StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, null);
+ StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a'", NON_PATH_COLS);
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+ // Select deletes from new schema
+ Record partitionC = partitionRecordTemplate.copy("data", "c");
+ StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, null);
+ StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c'", NON_PATH_COLS);
+
+ Assert.assertEquals("Position Delete table should contain expected rows", expectedC, actualC);
+ dropTable(tableName);
+ }
+
+ @Test
+ public void testNormalWritesNotAllowed() throws IOException {
+ String tableName = "test_normal_write_not_allowed";
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table tab = createTable(tableName, SCHEMA, spec);
+
+ DataFile dataFileA = dataFile(tab, "a");
+ tab.newAppend().appendFile(dataFileA).commit();
+
+ Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+ tab.newRowDelta().addDeletes(deletesA.second()).commit();
+
+ String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+
+ Dataset<Row> scanDF = spark.read().format("iceberg").load(posDeletesTableName);
+
+ Assert.assertThrows(
+ "position_deletes table can only be written by RewriteDeleteFiles",
+ IllegalArgumentException.class,
+ () -> scanDF.writeTo(posDeletesTableName).append());
+
+ dropTable(tableName);
}
private StructLikeSet actual(String tableName, Table table) {
- return actual(tableName, table, null);
+ return actual(tableName, table, null, null);
}
private StructLikeSet actual(String tableName, Table table, String filter) {
+ return actual(tableName, table, filter, null);
+ }
+
+ private StructLikeSet actual(String tableName, Table table, String filter, List<String> cols) {
Dataset<Row> df =
- spark.read().format("iceberg").load("default." + tableName + ".position_deletes");
+ spark
+ .read()
+ .format("iceberg")
+ .load(catalogName + ".default." + tableName + ".position_deletes");
if (filter != null) {
df = df.filter(filter);
}
+ if (cols != null) {
+ df = df.select(cols.get(0), cols.subList(1, cols.size()).toArray(new String[0]));
+ }
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
Types.StructType projection = deletesTable.schema().asStruct();
+ if (cols != null) {
+ projection =
+ Types.StructType.of(
+ projection.fields().stream()
+ .filter(f -> cols.contains(f.name()))
+ .collect(Collectors.toList()));
+ }
+ Types.StructType finalProjection = projection;
StructLikeSet set = StructLikeSet.create(projection);
df.collectAsList()
.forEach(
row -> {
- SparkStructLike rowWrapper = new SparkStructLike(projection);
+ SparkStructLike rowWrapper = new SparkStructLike(finalProjection);
set.add(rowWrapper.wrap(row));
});
@@ -747,11 +1335,12 @@ public class TestPositionDeletesTable extends SparkTestBase {
"2",
TableProperties.DEFAULT_FILE_FORMAT,
format.toString());
- return catalog.createTable(TableIdentifier.of("default", name), schema, spec, properties);
+ return validationCatalog.createTable(
+ TableIdentifier.of("default", name), schema, spec, properties);
}
protected void dropTable(String name) {
- catalog.dropTable(TableIdentifier.of("default", name));
+ validationCatalog.dropTable(TableIdentifier.of("default", name), false);
}
private PositionDelete<GenericRecord> positionDelete(CharSequence path, Long position) {
@@ -781,6 +1370,13 @@ public class TestPositionDeletesTable extends SparkTestBase {
MetadataTableUtils.createMetadataTableInstance(
testTable, MetadataTableType.POSITION_DELETES);
Types.StructType posDeleteSchema = deletesTable.schema().asStruct();
+ // Do not compare file paths
+ if (deleteFilePath == null) {
+ posDeleteSchema =
+ TypeUtil.selectNot(
+ deletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID))
+ .asStruct();
+ }
final Types.StructType finalSchema = posDeleteSchema;
StructLikeSet set = StructLikeSet.create(posDeleteSchema);
deletes.stream()
@@ -794,7 +1390,9 @@ public class TestPositionDeletesTable extends SparkTestBase {
record.setField("partition", partitionStruct);
}
record.setField("spec_id", specId);
- record.setField("delete_file_path", deleteFilePath);
+ if (deleteFilePath != null) {
+ record.setField("delete_file_path", deleteFilePath);
+ }
return record;
})
.forEach(set::add);
@@ -890,7 +1488,7 @@ public class TestPositionDeletesTable extends SparkTestBase {
positionDelete(
tab.schema(),
dataFile.path(),
- 0L,
+ 1L,
idPartition != null ? idPartition : 61,
dataPartition != null ? dataPartition : "r"));
@@ -910,4 +1508,59 @@ public class TestPositionDeletesTable extends SparkTestBase {
tab, Files.localOutput(temp.newFile()), partitionInfo, deletes);
return Pair.of(deletes, deleteFile);
}
+
+ private <T extends ScanTask> void stageTask(
+ Table tab, String fileSetID, CloseableIterable<T> tasks) {
+ ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
+ taskSetManager.stageTasks(tab, fileSetID, Lists.newArrayList(tasks));
+ }
+
+ private void commit(
+ Table baseTab,
+ Table posDeletesTable,
+ String fileSetID,
+ int expectedSourceFiles,
+ int expectedTargetFiles) {
+ PositionDeletesRewriteCoordinator rewriteCoordinator = PositionDeletesRewriteCoordinator.get();
+ Set<DeleteFile> rewrittenFiles =
+ ScanTaskSetManager.get().fetchTasks(posDeletesTable, fileSetID).stream()
+ .map(t -> ((PositionDeletesScanTask) t).file())
+ .collect(Collectors.toSet());
+ Set<DeleteFile> addedFiles = rewriteCoordinator.fetchNewFiles(posDeletesTable, fileSetID);
+
+ // Assert new files and old files are equal in number but different in paths
+ Assert.assertEquals(expectedSourceFiles, rewrittenFiles.size());
+ Assert.assertEquals(expectedTargetFiles, addedFiles.size());
+
+ List<String> sortedAddedFiles =
+ addedFiles.stream().map(f -> f.path().toString()).sorted().collect(Collectors.toList());
+ List<String> sortedRewrittenFiles =
+ rewrittenFiles.stream().map(f -> f.path().toString()).sorted().collect(Collectors.toList());
+ Assert.assertNotEquals("Lists should not be the same", sortedAddedFiles, sortedRewrittenFiles);
+
+ baseTab
+ .newRewrite()
+ .rewriteFiles(ImmutableSet.of(), rewrittenFiles, ImmutableSet.of(), addedFiles)
+ .commit();
+ }
+
+ private void commit(Table baseTab, Table posDeletesTable, String fileSetID, int expectedFiles) {
+ commit(baseTab, posDeletesTable, fileSetID, expectedFiles, expectedFiles);
+ }
+
+ private CloseableIterable<ScanTask> tasks(
+ Table posDeletesTable, String partitionColumn, String partitionValue) {
+
+ Expression filter = Expressions.equal("partition." + partitionColumn, partitionValue);
+ CloseableIterable<ScanTask> files = posDeletesTable.newBatchScan().filter(filter).planFiles();
+
+ // take care of fail to filter in some partition evolution cases
+ return CloseableIterable.filter(
+ files,
+ t -> {
+ StructLike filePartition = ((PositionDeletesScanTask) t).partition();
+ String filePartitionValue = filePartition.get(0, String.class);
+ return filePartitionValue != null && filePartitionValue.equals(partitionValue);
+ });
+ }
}