You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2022/03/11 23:21:51 UTC
[iceberg] branch master updated: Core: Add delete_files metadata table (#4243)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ad1e634 Core: Add delete_files metadata table (#4243)
ad1e634 is described below
commit ad1e634a4806de2d420d18e7c0e82b74409cfe69
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Fri Mar 11 15:21:40 2022 -0800
Core: Add delete_files metadata table (#4243)
---
.../java/org/apache/iceberg/AllDataFilesTable.java | 4 +-
.../{DataFilesTable.java => BaseFilesTable.java} | 107 ++++++-----
.../java/org/apache/iceberg/DataFilesTable.java | 116 ++----------
.../java/org/apache/iceberg/DeleteFilesTable.java | 67 +++++++
.../java/org/apache/iceberg/MetadataTableType.java | 1 +
.../org/apache/iceberg/MetadataTableUtils.java | 2 +
.../java/org/apache/iceberg/TableTestBase.java | 32 ++--
.../org/apache/iceberg/TestMetadataTableScans.java | 203 ++++++++++++++++++++-
.../apache/iceberg/hadoop/HadoopTableTestBase.java | 9 +
.../iceberg/hadoop/TestTableSerialization.java | 8 +
.../spark/extensions/TestMetadataTables.java | 177 ++++++++++++++++++
11 files changed, 545 insertions(+), 181 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
index d5db8cc..4eca0b4 100644
--- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.iceberg.BaseFilesTable.ManifestReadTask;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -118,7 +119,8 @@ public class AllDataFilesTable extends BaseMetadataTable {
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
return CloseableIterable.transform(manifests, manifest ->
- new DataFilesTable.ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
+ new ManifestReadTask(ops.io(), ops.current().specsById(), manifest, schema(),
+ schemaString, specString, residuals));
}
}
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
similarity index 61%
copy from core/src/main/java/org/apache/iceberg/DataFilesTable.java
copy to core/src/main/java/org/apache/iceberg/BaseFilesTable.java
index 31c0bea..a0b835a4 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
@@ -19,6 +19,8 @@
package org.apache.iceberg;
+import java.util.List;
+import java.util.Map;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
@@ -33,24 +35,15 @@ import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.StructType;
/**
- * A {@link Table} implementation that exposes a table's data files as rows.
+ * Base class logic for files metadata tables
*/
-public class DataFilesTable extends BaseMetadataTable {
+abstract class BaseFilesTable extends BaseMetadataTable {
- DataFilesTable(TableOperations ops, Table table) {
- this(ops, table, table.name() + ".files");
- }
-
- DataFilesTable(TableOperations ops, Table table, String name) {
+ BaseFilesTable(TableOperations ops, Table table, String name) {
super(ops, table, name);
}
@Override
- public TableScan newScan() {
- return new FilesTableScan(operations(), table(), schema());
- }
-
- @Override
public Schema schema() {
StructType partitionType = Partitioning.partitionType(table());
Schema schema = new Schema(DataFile.getType(partitionType).fields());
@@ -62,90 +55,108 @@ public class DataFilesTable extends BaseMetadataTable {
}
}
- @Override
- MetadataTableType metadataTableType() {
- return MetadataTableType.FILES;
- }
-
- public static class FilesTableScan extends BaseMetadataTableScan {
+ abstract static class BaseFilesTableScan extends BaseMetadataTableScan {
private final Schema fileSchema;
+ private final MetadataTableType type;
- FilesTableScan(TableOperations ops, Table table, Schema fileSchema) {
+ protected BaseFilesTableScan(TableOperations ops, Table table, Schema fileSchema, MetadataTableType type) {
super(ops, table, fileSchema);
this.fileSchema = fileSchema;
+ this.type = type;
}
- private FilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema,
- TableScanContext context) {
+ protected BaseFilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema,
+ TableScanContext context, MetadataTableType type) {
super(ops, table, schema, context);
this.fileSchema = fileSchema;
+ this.type = type;
+ }
+
+ protected Schema fileSchema() {
+ return fileSchema;
}
@Override
public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
throw new UnsupportedOperationException(
- String.format("Cannot incrementally scan table of type %s", MetadataTableType.FILES.name()));
+ String.format("Cannot incrementally scan table of type %s", type.name()));
}
@Override
public TableScan appendsAfter(long fromSnapshotId) {
throw new UnsupportedOperationException(
- String.format("Cannot incrementally scan table of type %s", MetadataTableType.FILES.name()));
+ String.format("Cannot incrementally scan table of type %s", type.name()));
}
@Override
- protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
- return new FilesTableScan(ops, table, schema, fileSchema, context);
- }
-
- @Override
- protected CloseableIterable<FileScanTask> planFiles(
- TableOperations ops, Snapshot snapshot, Expression rowFilter,
+ protected CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot, Expression rowFilter,
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
- CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.dataManifests());
+ CloseableIterable<ManifestFile> filtered = filterManifests(manifests(), rowFilter, caseSensitive);
+
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
- // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
- Expression partitionFilter = Projections
- .inclusive(
- transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX),
- caseSensitive)
- .project(rowFilter);
-
- ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
- partitionFilter, table().spec(), caseSensitive);
- CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests, manifestEval::eval);
-
// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
// This data task needs to use the table schema, which may not include a partition schema to avoid having an
// empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
// all cases.
return CloseableIterable.transform(filtered, manifest ->
- new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
+ new ManifestReadTask(ops.io(), ops.current().specsById(),
+ manifest, schema(), schemaString, specString, residuals));
+ }
+
+ /**
+ * @return list of manifest files to explore for this files metadata table scan
+ */
+ protected abstract List<ManifestFile> manifests();
+
+ private CloseableIterable<ManifestFile> filterManifests(List<ManifestFile> manifests,
+ Expression rowFilter,
+ boolean caseSensitive) {
+ CloseableIterable<ManifestFile> manifestIterable = CloseableIterable.withNoopClose(manifests);
+
+ // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
+ PartitionSpec spec = transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX);
+ Expression partitionFilter = Projections.inclusive(spec, caseSensitive).project(rowFilter);
+
+ ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
+ partitionFilter, table().spec(), caseSensitive);
+
+ return CloseableIterable.filter(manifestIterable, manifestEval::eval);
}
}
static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final FileIO io;
+ private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
private final Schema schema;
- ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString,
- String specString, ResidualEvaluator residuals) {
+ ManifestReadTask(FileIO io, Map<Integer, PartitionSpec> specsById, ManifestFile manifest,
+ Schema schema, String schemaString, String specString, ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
this.io = io;
+ this.specsById = specsById;
this.manifest = manifest;
this.schema = schema;
}
@Override
public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(
- ManifestFiles.read(manifest, io).project(schema),
- file -> (GenericDataFile) file);
+ return CloseableIterable.transform(manifestEntries(), file -> (StructLike) file);
+ }
+
+ private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+ switch (manifest.content()) {
+ case DATA:
+ return ManifestFiles.read(manifest, io, specsById).project(schema);
+ case DELETES:
+ return ManifestFiles.readDeleteManifest(manifest, io, specsById).project(schema);
+ default:
+ throw new IllegalArgumentException("Unsupported manifest content type:" + manifest.content());
+ }
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 31c0bea..cc35205 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -19,23 +19,12 @@
package org.apache.iceberg;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.expressions.Projections;
-import org.apache.iceberg.expressions.ResidualEvaluator;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types.StructType;
+import java.util.List;
/**
* A {@link Table} implementation that exposes a table's data files as rows.
*/
-public class DataFilesTable extends BaseMetadataTable {
+public class DataFilesTable extends BaseFilesTable {
DataFilesTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".files");
@@ -47,19 +36,7 @@ public class DataFilesTable extends BaseMetadataTable {
@Override
public TableScan newScan() {
- return new FilesTableScan(operations(), table(), schema());
- }
-
- @Override
- public Schema schema() {
- StructType partitionType = Partitioning.partitionType(table());
- Schema schema = new Schema(DataFile.getType(partitionType).fields());
- if (partitionType.fields().size() < 1) {
- // avoid returning an empty struct, which is not always supported. instead, drop the partition field
- return TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
- } else {
- return schema;
- }
+ return new DataFilesTableScan(operations(), table(), schema());
}
@Override
@@ -67,95 +44,24 @@ public class DataFilesTable extends BaseMetadataTable {
return MetadataTableType.FILES;
}
- public static class FilesTableScan extends BaseMetadataTableScan {
- private final Schema fileSchema;
+ public static class DataFilesTableScan extends BaseFilesTableScan {
- FilesTableScan(TableOperations ops, Table table, Schema fileSchema) {
- super(ops, table, fileSchema);
- this.fileSchema = fileSchema;
+ DataFilesTableScan(TableOperations ops, Table table, Schema fileSchema) {
+ super(ops, table, fileSchema, MetadataTableType.FILES);
}
- private FilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema,
- TableScanContext context) {
- super(ops, table, schema, context);
- this.fileSchema = fileSchema;
- }
-
- @Override
- public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
- throw new UnsupportedOperationException(
- String.format("Cannot incrementally scan table of type %s", MetadataTableType.FILES.name()));
- }
-
- @Override
- public TableScan appendsAfter(long fromSnapshotId) {
- throw new UnsupportedOperationException(
- String.format("Cannot incrementally scan table of type %s", MetadataTableType.FILES.name()));
+ DataFilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema, TableScanContext context) {
+ super(ops, table, schema, fileSchema, context, MetadataTableType.FILES);
}
@Override
protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
- return new FilesTableScan(ops, table, schema, fileSchema, context);
+ return new DataFilesTableScan(ops, table, schema, fileSchema(), context);
}
@Override
- protected CloseableIterable<FileScanTask> planFiles(
- TableOperations ops, Snapshot snapshot, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
- CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.dataManifests());
- String schemaString = SchemaParser.toJson(schema());
- String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
- Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
- ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
-
- // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
- Expression partitionFilter = Projections
- .inclusive(
- transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX),
- caseSensitive)
- .project(rowFilter);
-
- ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
- partitionFilter, table().spec(), caseSensitive);
- CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests, manifestEval::eval);
-
- // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
- // This data task needs to use the table schema, which may not include a partition schema to avoid having an
- // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
- // all cases.
- return CloseableIterable.transform(filtered, manifest ->
- new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
- }
- }
-
- static class ManifestReadTask extends BaseFileScanTask implements DataTask {
- private final FileIO io;
- private final ManifestFile manifest;
- private final Schema schema;
-
- ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString,
- String specString, ResidualEvaluator residuals) {
- super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
- this.io = io;
- this.manifest = manifest;
- this.schema = schema;
- }
-
- @Override
- public CloseableIterable<StructLike> rows() {
- return CloseableIterable.transform(
- ManifestFiles.read(manifest, io).project(schema),
- file -> (GenericDataFile) file);
- }
-
- @Override
- public Iterable<FileScanTask> split(long splitSize) {
- return ImmutableList.of(this); // don't split
- }
-
- @VisibleForTesting
- ManifestFile manifest() {
- return manifest;
+ protected List<ManifestFile> manifests() {
+ return snapshot().dataManifests();
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java b/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java
new file mode 100644
index 0000000..201c76d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+
+/**
+ * A {@link Table} implementation that exposes a table's delete files as rows.
+ */
+public class DeleteFilesTable extends BaseFilesTable {
+
+ DeleteFilesTable(TableOperations ops, Table table) {
+ this(ops, table, table.name() + ".delete_files");
+ }
+
+ DeleteFilesTable(TableOperations ops, Table table, String name) {
+ super(ops, table, name);
+ }
+
+ @Override
+ public TableScan newScan() {
+ return new DeleteFilesTableScan(operations(), table(), schema());
+ }
+
+ @Override
+ MetadataTableType metadataTableType() {
+ return MetadataTableType.DELETE_FILES;
+ }
+
+ public static class DeleteFilesTableScan extends BaseFilesTableScan {
+
+ DeleteFilesTableScan(TableOperations ops, Table table, Schema fileSchema) {
+ super(ops, table, fileSchema, MetadataTableType.DELETE_FILES);
+ }
+
+ DeleteFilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema, TableScanContext context) {
+ super(ops, table, schema, fileSchema, context, MetadataTableType.DELETE_FILES);
+ }
+
+ @Override
+ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ return new DeleteFilesTableScan(ops, table, schema, fileSchema(), context);
+ }
+
+ @Override
+ protected List<ManifestFile> manifests() {
+ return snapshot().deleteManifests();
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableType.java b/core/src/main/java/org/apache/iceberg/MetadataTableType.java
index 93e53b9..b173c0b 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataTableType.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataTableType.java
@@ -24,6 +24,7 @@ import java.util.Locale;
public enum MetadataTableType {
ENTRIES,
FILES,
+ DELETE_FILES,
HISTORY,
SNAPSHOTS,
MANIFESTS,
diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
index afb1619..52a429c 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
@@ -55,6 +55,8 @@ public class MetadataTableUtils {
return new ManifestEntriesTable(ops, baseTable, metadataTableName);
case FILES:
return new DataFilesTable(ops, baseTable, metadataTableName);
+ case DELETE_FILES:
+ return new DeleteFilesTable(ops, baseTable, metadataTableName);
case HISTORY:
return new HistoryTable(ops, baseTable, metadataTableName);
case SNAPSHOTS:
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 82e6076..5abfb2c 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -107,34 +107,24 @@ public class TableTestBase {
.withPartitionPath("data_bucket=2") // easy way to set partition data for now
.withRecordCount(1)
.build();
- static final DataFile FILE_D = DataFiles.builder(SPEC)
- .withPath("/path/to/data-d.parquet")
- .withFileSizeInBytes(10)
- .withPartitionPath("data_bucket=3") // easy way to set partition data for now
- .withRecordCount(1)
- .build();
- static final DataFile FILE_PARTITION_0 = DataFiles.builder(SPEC)
- .withPath("/path/to/data-0.parquet")
- .withFileSizeInBytes(10)
- .withPartition(TestHelpers.Row.of(0))
- .withRecordCount(1)
- .build();
- static final DataFile FILE_PARTITION_1 = DataFiles.builder(SPEC)
- .withPath("/path/to/data-1.parquet")
+ static final DeleteFile FILE_C2_DELETES = FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes(1)
+ .withPath("/path/to/data-c-deletes.parquet")
.withFileSizeInBytes(10)
- .withPartition(TestHelpers.Row.of(1))
+ .withPartitionPath("data_bucket=2") // easy way to set partition data for now
.withRecordCount(1)
.build();
- static final DataFile FILE_PARTITION_2 = DataFiles.builder(SPEC)
- .withPath("/path/to/data-2.parquet")
+ static final DataFile FILE_D = DataFiles.builder(SPEC)
+ .withPath("/path/to/data-d.parquet")
.withFileSizeInBytes(10)
- .withPartition(TestHelpers.Row.of(2))
+ .withPartitionPath("data_bucket=3") // easy way to set partition data for now
.withRecordCount(1)
.build();
- static final DataFile FILE_PARTITION_3 = DataFiles.builder(SPEC)
- .withPath("/path/to/data-3.parquet")
+ static final DeleteFile FILE_D2_DELETES = FileMetadata.deleteFileBuilder(SPEC)
+ .ofEqualityDeletes(1)
+ .withPath("/path/to/data-d-deletes.parquet")
.withFileSizeInBytes(10)
- .withPartition(TestHelpers.Row.of(3))
+ .withPartitionPath("data_bucket=3") // easy way to set partition data for now
.withRecordCount(1)
.build();
static final DataFile FILE_WITH_STATS = DataFiles.builder(SPEC)
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 1bb8f28..40f0989 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.StreamSupport;
+import org.apache.iceberg.BaseFilesTable.ManifestReadTask;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
@@ -32,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -53,17 +55,32 @@ public class TestMetadataTableScans extends TableTestBase {
private void preparePartitionedTable() {
table.newFastAppend()
- .appendFile(FILE_PARTITION_0)
+ .appendFile(FILE_A)
.commit();
table.newFastAppend()
- .appendFile(FILE_PARTITION_1)
+ .appendFile(FILE_C)
.commit();
table.newFastAppend()
- .appendFile(FILE_PARTITION_2)
+ .appendFile(FILE_D)
.commit();
table.newFastAppend()
- .appendFile(FILE_PARTITION_3)
+ .appendFile(FILE_B)
.commit();
+
+ if (formatVersion == 2) {
+ table.newRowDelta()
+ .addDeletes(FILE_A_DELETES)
+ .commit();
+ table.newRowDelta()
+ .addDeletes(FILE_B_DELETES)
+ .commit();
+ table.newRowDelta()
+ .addDeletes(FILE_C2_DELETES)
+ .commit();
+ table.newRowDelta()
+ .addDeletes(FILE_D2_DELETES)
+ .commit();
+ }
}
@Test
@@ -463,6 +480,180 @@ public class TestMetadataTableScans extends TableTestBase {
}
@Test
+ public void testDeleteFilesTableScanNoFilter() {
+ Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+ preparePartitionedTable();
+
+ Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+ Types.StructType expected = new Schema(
+ required(102, "partition", Types.StructType.of(
+ optional(1000, "data_bucket", Types.IntegerType.get())),
+ "Partition data tuple, schema based on the partition spec")).asStruct();
+
+ TableScan scanNoFilter = deleteFilesTable.newScan().select("partition.data_bucket");
+ Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+ CloseableIterable<FileScanTask> tasks = scanNoFilter.planFiles();
+
+ Assert.assertEquals(4, Iterables.size(tasks));
+ validateFileScanTasks(tasks, 0);
+ validateFileScanTasks(tasks, 1);
+ validateFileScanTasks(tasks, 2);
+ validateFileScanTasks(tasks, 3);
+ }
+
+ @Test
+ public void testDeleteFilesTableScanAndFilter() {
+ Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+ preparePartitionedTable();
+
+ Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+ Expression andEquals = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scan = deleteFilesTable.newScan().filter(andEquals);
+ CloseableIterable<FileScanTask> tasks = scan.planFiles();
+ Assert.assertEquals(1, Iterables.size(tasks));
+ validateFileScanTasks(tasks, 0);
+ }
+
+ @Test
+ public void testDeleteFilesTableScanAndFilterWithPlanTasks() {
+ Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+ preparePartitionedTable();
+
+ Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+ Expression andEquals = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scan = deleteFilesTable.newScan().filter(andEquals);
+ CloseableIterable<CombinedScanTask> tasks = scan.planTasks();
+ Assert.assertEquals(1, Iterables.size(tasks));
+ validateCombinedScanTasks(tasks, 0);
+ }
+
+ @Test
+ public void testDeleteFilesTableScanLtFilter() {
+ Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+ preparePartitionedTable();
+
+ Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+ Expression lt = Expressions.lessThan("partition.data_bucket", 2);
+ TableScan scan = deleteFilesTable.newScan().filter(lt);
+ CloseableIterable<FileScanTask> tasksLt = scan.planFiles();
+ Assert.assertEquals(2, Iterables.size(tasksLt));
+ validateFileScanTasks(tasksLt, 0);
+ validateFileScanTasks(tasksLt, 1);
+ }
+
+ @Test
+ public void testDeleteFilesTableScanOrFilter() {
+ Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+ preparePartitionedTable();
+
+ Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+ Expression or = Expressions.or(
+ Expressions.equal("partition.data_bucket", 2),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scan = deleteFilesTable.newScan()
+ .filter(or);
+ CloseableIterable<FileScanTask> tasksOr = scan.planFiles();
+ Assert.assertEquals(4, Iterables.size(tasksOr));
+ validateFileScanTasks(tasksOr, 0);
+ validateFileScanTasks(tasksOr, 1);
+ validateFileScanTasks(tasksOr, 2);
+ validateFileScanTasks(tasksOr, 3);
+ }
+
+ @Test
+ public void testDeleteFilesScanNotFilter() {
+ Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+ preparePartitionedTable();
+ Table deleteFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
+ TableScan scan = deleteFilesTable.newScan()
+ .filter(not);
+ CloseableIterable<FileScanTask> tasksNot = scan.planFiles();
+ Assert.assertEquals(2, Iterables.size(tasksNot));
+ validateFileScanTasks(tasksNot, 2);
+ validateFileScanTasks(tasksNot, 3);
+ }
+
+ @Test
+ public void testDeleteFilesTableScanInFilter() {
+ Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+ preparePartitionedTable();
+
+ Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+ Expression set = Expressions.in("partition.data_bucket", 2, 3);
+ TableScan scan = deleteFilesTable.newScan()
+ .filter(set);
+ CloseableIterable<FileScanTask> tasksIn = scan.planFiles();
+ Assert.assertEquals(2, Iterables.size(tasksIn));
+
+ validateFileScanTasks(tasksIn, 2);
+ validateFileScanTasks(tasksIn, 3);
+ }
+
+ @Test
+ public void testDeleteFilesTableScanNotNullFilter() {
+ Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+ preparePartitionedTable();
+
+ Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+ Expression notNull = Expressions.notNull("partition.data_bucket");
+ TableScan scan = deleteFilesTable.newScan()
+ .filter(notNull);
+ CloseableIterable<FileScanTask> tasksNotNull = scan.planFiles();
+ Assert.assertEquals(4, Iterables.size(tasksNotNull));
+
+ validateFileScanTasks(tasksNotNull, 0);
+ validateFileScanTasks(tasksNotNull, 1);
+ validateFileScanTasks(tasksNotNull, 2);
+ validateFileScanTasks(tasksNotNull, 3);
+ }
+
+ @Test
+ public void testDeleteFilesTableSelection() throws IOException {
+ Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ table.newRowDelta()
+ .addDeletes(FILE_A_DELETES)
+ .addDeletes(FILE_A2_DELETES)
+ .commit();
+
+ Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+ TableScan scan = deleteFilesTable.newScan()
+ .filter(Expressions.equal("record_count", 1))
+ .select("content", "record_count");
+ validateTaskScanResiduals(scan, false);
+ Types.StructType expected = new Schema(
+ optional(134, "content", Types.IntegerType.get(),
+ "Contents of the file: 0=data, 1=position deletes, 2=equality deletes"),
+ required(103, "record_count", Types.LongType.get(), "Number of records in the file")
+ ).asStruct();
+ Assert.assertEquals(expected, scan.schema().asStruct());
+ }
+
+ @Test
public void testPartitionColumnNamedPartition() throws Exception {
TestTables.clearTables();
this.tableDir = temp.newFolder();
@@ -601,14 +792,14 @@ public class TestMetadataTableScans extends TableTestBase {
private void validateFileScanTasks(CloseableIterable<FileScanTask> fileScanTasks, int partValue) {
Assert.assertTrue("File scan tasks do not include correct file",
StreamSupport.stream(fileScanTasks.spliterator(), false).anyMatch(t -> {
- ManifestFile mf = ((DataFilesTable.ManifestReadTask) t).manifest();
+ ManifestFile mf = ((ManifestReadTask) t).manifest();
return manifestHasPartition(mf, partValue);
}));
}
private void validateCombinedScanTasks(CloseableIterable<CombinedScanTask> tasks, int partValue) {
StreamSupport.stream(tasks.spliterator(), false)
- .flatMap(c -> c.files().stream().map(t -> ((DataFilesTable.ManifestReadTask) t).manifest()))
+ .flatMap(c -> c.files().stream().map(t -> ((ManifestReadTask) t).manifest()))
.anyMatch(m -> manifestHasPartition(m, partValue));
}
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
index 0c21148..29643ab 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -87,6 +89,13 @@ public class HadoopTableTestBase {
.withPartitionPath("data_bucket=1") // easy way to set partition data for now
.withRecordCount(2) // needs at least one record or else metrics will filter it out
.build();
+ static final DeleteFile FILE_B_DELETES = FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-b-deletes.parquet")
+ .withFileSizeInBytes(0)
+ .withPartitionPath("data_bucket=1")
+ .withRecordCount(1)
+ .build();
static final DataFile FILE_C = DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(0)
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
index 1086ba0..4159ce6 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.io.CloseableIterable;
@@ -120,6 +121,10 @@ public class TestTableSerialization extends HadoopTableTestBase {
@Test
public void testSerializableMetadataTablesPlanning() throws IOException {
+ table.updateProperties()
+ .set(TableProperties.FORMAT_VERSION, "2")
+ .commit();
+
table.newAppend()
.appendFile(FILE_A)
.commit();
@@ -138,6 +143,9 @@ public class TestTableSerialization extends HadoopTableTestBase {
table.newAppend()
.appendFile(FILE_B)
.commit();
+ table.newRowDelta()
+ .addDeletes(FILE_B_DELETES)
+ .commit();
for (MetadataTableType type : MetadataTableType.values()) {
// Collect the deserialized data
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
new file mode 100644
index 0000000..7a8bd41
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestMetadataTables extends SparkExtensionsTestBase {
+
+ public TestMetadataTables(String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testDeleteFilesTable() throws Exception {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES" +
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')", tableName);
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c"),
+ new SimpleRecord(4, "d")
+ );
+ spark.createDataset(records, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ sql("DELETE FROM %s WHERE id=1", tableName);
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ List<ManifestFile> expectedManifests = TestHelpers.deleteManifests(table);
+ Assert.assertEquals("Should have 1 delete manifest", 1, expectedManifests.size());
+
+ Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema();
+ Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".delete_files").schema();
+
+ List<Row> actual = spark.sql("SELECT * FROM " + tableName + ".delete_files").collectAsList();
+
+ List<Record> expected = expectedEntries(table, entriesTableSchema, expectedManifests, null);
+
+ Assert.assertEquals("Should be one delete file manifest entry", 1, expected.size());
+ Assert.assertEquals("Metadata table should return one delete file", 1, actual.size());
+
+ TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expected.get(0), actual.get(0));
+ }
+
+ @Test
+ public void testDeleteFilesTablePartitioned() throws Exception {
+ sql("CREATE TABLE %s (id bigint, data string) " +
+ "USING iceberg " +
+ "PARTITIONED BY (data) " +
+ "TBLPROPERTIES" +
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')", tableName);
+
+ List<SimpleRecord> recordsA = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "a")
+ );
+ spark.createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ List<SimpleRecord> recordsB = Lists.newArrayList(
+ new SimpleRecord(1, "b"),
+ new SimpleRecord(2, "b")
+ );
+ spark.createDataset(recordsB, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ sql("DELETE FROM %s WHERE id=1 AND data='a'", tableName);
+ sql("DELETE FROM %s WHERE id=1 AND data='b'", tableName);
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ List<ManifestFile> expectedManifests = TestHelpers.deleteManifests(table);
+ Assert.assertEquals("Should have 2 delete files", 2, expectedManifests.size());
+
+ List<Row> actual = spark.sql("SELECT * FROM " + tableName + ".delete_files " +
+ "WHERE partition.data='a'").collectAsList();
+
+ Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema();
+ Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".delete_files").schema();
+
+ List<Record> expected = expectedEntries(table, entriesTableSchema, expectedManifests, "a");
+
+ Assert.assertEquals("Should be one delete file manifest entry", 1, expected.size());
+ Assert.assertEquals("Metadata table should return one delete file", 1, actual.size());
+
+ TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expected.get(0), actual.get(0));
+ }
+
+ /**
+ * Find matching manifest entries of an Iceberg table
+ * @param table iceberg table
+ * @param entriesTableSchema schema of Manifest entries
+ * @param manifestsToExplore manifests to explore of the table
+ * @param partValue partition value that manifest entries must match, or null to skip filtering
+ */
+ private List<Record> expectedEntries(Table table, Schema entriesTableSchema,
+ List<ManifestFile> manifestsToExplore, String partValue) throws IOException {
+ List<Record> expected = Lists.newArrayList();
+ for (ManifestFile manifest : manifestsToExplore) {
+ InputFile in = table.io().newInputFile(manifest.path());
+ try (CloseableIterable<Record> rows = Avro.read(in).project(entriesTableSchema).build()) {
+ for (Record record : rows) {
+ if ((Integer) record.get("status") < 2 /* added or existing */) {
+ Record file = (Record) record.get("data_file");
+ if (partitionMatch(file, partValue)) {
+ asDeleteRecords(file);
+ expected.add(file);
+ }
+ }
+ }
+ }
+ }
+ return expected;
+ }
+
+ // Populate certain fields derived in the metadata tables
+ private void asDeleteRecords(Record file) {
+ file.put(0, FileContent.POSITION_DELETES.id());
+ file.put(3, 0); // specId
+ }
+
+ private boolean partitionMatch(Record file, String partValue) {
+ if (partValue == null) {
+ return true;
+ }
+ Record partition = (Record) file.get(4);
+ return partValue.equals(partition.get(0).toString());
+ }
+}