You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/08/10 17:28:16 UTC
[iceberg] branch master updated: Spark 3.3: Use typed beans in BaseSparkAction (#5469)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4c5dc6fb6 Spark 3.3: Use typed beans in BaseSparkAction (#5469)
4c5dc6fb6 is described below
commit 4c5dc6fb6e2685221c66f8eb7762d5f8d8786d75
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Aug 10 10:28:11 2022 -0700
Spark 3.3: Use typed beans in BaseSparkAction (#5469)
---
.../iceberg/spark/actions/BaseSparkAction.java | 111 ++++++++-------------
.../actions/DeleteOrphanFilesSparkAction.java | 13 ++-
.../actions/DeleteReachableFilesSparkAction.java | 28 +++---
.../spark/actions/ExpireSnapshotsSparkAction.java | 62 ++++++++----
.../org/apache/iceberg/spark/actions/FileInfo.java | 52 ++++++++++
.../iceberg/spark/actions/ManifestFileBean.java | 4 +
.../spark/actions/TestExpireSnapshotsAction.java | 43 +++++++-
7 files changed, 205 insertions(+), 108 deletions(-)
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index e9012a6d7..728d21df0 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
@@ -59,12 +60,10 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Tuple2;
abstract class BaseSparkAction<ThisT> {
@@ -135,9 +134,10 @@ abstract class BaseSparkAction<ThisT> {
}
// builds a DF of delete and data file path and type by reading all manifests
- protected Dataset<Row> buildValidContentFileWithTypeDF(Table table) {
- Broadcast<Table> tableBroadcast =
- sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ protected Dataset<FileInfo> contentFileDS(Table table) {
+ Table serializableTable = SerializableTableWithSize.copyOf(table);
+ Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+ int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
Dataset<ManifestFileBean> allManifests =
loadMetadataTable(table, ALL_MANIFESTS)
@@ -148,68 +148,47 @@ abstract class BaseSparkAction<ThisT> {
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId")
.dropDuplicates("path")
- .repartition(
- spark
- .sessionState()
- .conf()
- .numShufflePartitions()) // avoid adaptive execution combining tasks
- .as(Encoders.bean(ManifestFileBean.class));
-
- return allManifests
- .flatMap(
- new ReadManifest(tableBroadcast), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
- .toDF(FILE_PATH, FILE_TYPE);
- }
+ .repartition(numShufflePartitions) // avoid adaptive execution combining tasks
+ .as(ManifestFileBean.ENCODER);
- // builds a DF of delete and data file paths by reading all manifests
- protected Dataset<Row> buildValidContentFileDF(Table table) {
- return buildValidContentFileWithTypeDF(table).select(FILE_PATH);
+ return allManifests.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
}
- protected Dataset<Row> buildManifestFileDF(Table table) {
- return loadMetadataTable(table, ALL_MANIFESTS).select(col("path").as(FILE_PATH));
+ protected Dataset<FileInfo> manifestDS(Table table) {
+ return loadMetadataTable(table, ALL_MANIFESTS)
+ .select(col("path"), lit(MANIFEST).as("type"))
+ .as(FileInfo.ENCODER);
}
- protected Dataset<Row> buildManifestListDF(Table table) {
+ protected Dataset<FileInfo> manifestListDS(Table table) {
List<String> manifestLists = ReachableFileUtil.manifestListLocations(table);
- return spark.createDataset(manifestLists, Encoders.STRING()).toDF(FILE_PATH);
+ return toFileInfoDS(manifestLists, MANIFEST_LIST);
}
- protected Dataset<Row> buildOtherMetadataFileDF(Table table) {
- return buildOtherMetadataFileDF(
- table, false /* include all reachable previous metadata locations */);
+ protected Dataset<FileInfo> otherMetadataFileDS(Table table) {
+ return otherMetadataFileDS(table, false /* include all reachable old metadata locations */);
}
- protected Dataset<Row> buildAllReachableOtherMetadataFileDF(Table table) {
- return buildOtherMetadataFileDF(
- table, true /* include all reachable previous metadata locations */);
+ protected Dataset<FileInfo> allReachableOtherMetadataFileDS(Table table) {
+ return otherMetadataFileDS(table, true /* include all reachable old metadata locations */);
}
- private Dataset<Row> buildOtherMetadataFileDF(
- Table table, boolean includePreviousMetadataLocations) {
+ private Dataset<FileInfo> otherMetadataFileDS(Table table, boolean recursive) {
List<String> otherMetadataFiles = Lists.newArrayList();
- otherMetadataFiles.addAll(
- ReachableFileUtil.metadataFileLocations(table, includePreviousMetadataLocations));
+ otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, recursive));
otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table));
- return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF(FILE_PATH);
- }
-
- protected Dataset<Row> buildValidMetadataFileDF(Table table) {
- Dataset<Row> manifestDF = buildManifestFileDF(table);
- Dataset<Row> manifestListDF = buildManifestListDF(table);
- Dataset<Row> otherMetadataFileDF = buildOtherMetadataFileDF(table);
-
- return manifestDF.union(otherMetadataFileDF).union(manifestListDF);
- }
-
- protected Dataset<Row> withFileType(Dataset<Row> ds, String type) {
- return ds.withColumn(FILE_TYPE, lit(type));
+ return toFileInfoDS(otherMetadataFiles, OTHERS);
}
protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
return SparkTableUtil.loadMetadataTable(spark, table, type);
}
+ private Dataset<FileInfo> toFileInfoDS(List<String> paths, String type) {
+ List<FileInfo> fileInfoList = Lists.transform(paths, path -> new FileInfo(path, type));
+ return spark.createDataset(fileInfoList, FileInfo.ENCODER);
+ }
+
/**
* Deletes files and keeps track of how many files were removed for each file type.
*
@@ -219,7 +198,7 @@ abstract class BaseSparkAction<ThisT> {
* @return stats on which files were deleted
*/
protected DeleteSummary deleteFiles(
- ExecutorService executorService, Consumer<String> deleteFunc, Iterator<Row> files) {
+ ExecutorService executorService, Consumer<String> deleteFunc, Iterator<FileInfo> files) {
DeleteSummary summary = new DeleteSummary();
@@ -230,14 +209,14 @@ abstract class BaseSparkAction<ThisT> {
.executeWith(executorService)
.onFailure(
(fileInfo, exc) -> {
- String path = fileInfo.getString(0);
- String type = fileInfo.getString(1);
+ String path = fileInfo.getPath();
+ String type = fileInfo.getType();
LOG.warn("Delete failed for {}: {}", type, path, exc);
})
.run(
fileInfo -> {
- String path = fileInfo.getString(0);
- String type = fileInfo.getString(1);
+ String path = fileInfo.getPath();
+ String type = fileInfo.getType();
deleteFunc.accept(path);
summary.deletedFile(path, type);
});
@@ -317,8 +296,7 @@ abstract class BaseSparkAction<ThisT> {
}
}
- private static class ReadManifest
- implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+ private static class ReadManifest implements FlatMapFunction<ManifestFileBean, FileInfo> {
private final Broadcast<Table> table;
ReadManifest(Broadcast<Table> table) {
@@ -326,33 +304,32 @@ abstract class BaseSparkAction<ThisT> {
}
@Override
- public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+ public Iterator<FileInfo> call(ManifestFileBean manifest) {
return new ClosingIterator<>(entries(manifest));
}
- public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
+ public CloseableIterator<FileInfo> entries(ManifestFileBean manifest) {
+ ManifestContent content = manifest.content();
FileIO io = table.getValue().io();
Map<Integer, PartitionSpec> specs = table.getValue().specs();
- ImmutableList<String> projection =
- ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name());
+ List<String> proj = ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name());
- switch (manifest.content()) {
+ switch (content) {
case DATA:
return CloseableIterator.transform(
- ManifestFiles.read(manifest, io, specs).select(projection).iterator(),
- ReadManifest::contentFileWithType);
+ ManifestFiles.read(manifest, io, specs).select(proj).iterator(),
+ ReadManifest::toFileInfo);
case DELETES:
return CloseableIterator.transform(
- ManifestFiles.readDeleteManifest(manifest, io, specs).select(projection).iterator(),
- ReadManifest::contentFileWithType);
+ ManifestFiles.readDeleteManifest(manifest, io, specs).select(proj).iterator(),
+ ReadManifest::toFileInfo);
default:
- throw new IllegalArgumentException(
- "Unsupported manifest content type:" + manifest.content());
+ throw new IllegalArgumentException("Unsupported manifest content type:" + content);
}
}
- static Tuple2<String, String> contentFileWithType(ContentFile<?> file) {
- return new Tuple2<>(file.path().toString(), file.content().toString());
+ static FileInfo toFileInfo(ContentFile<?> file) {
+ return new FileInfo(file.path().toString(), file.content().toString());
}
}
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 7df3eaf94..527783b9c 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -238,9 +238,7 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
}
private DeleteOrphanFiles.Result doExecute() {
- Dataset<Row> validContentFileDF = buildValidContentFileDF(table);
- Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
- Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
+ Dataset<Row> validFileDF = buildValidFileDF();
Dataset<Row> actualFileDF =
compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
@@ -258,6 +256,15 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
return new BaseDeleteOrphanFilesActionResult(orphanFiles);
}
+ private Dataset<Row> buildValidFileDF() {
+ return contentFileDS(table)
+ .union(manifestDS(table))
+ .union(manifestListDS(table))
+ .union(otherMetadataFileDS(table))
+ .toDF(FILE_PATH, FILE_TYPE)
+ .select(FILE_PATH);
+ }
+
private Dataset<Row> buildActualFileDF() {
List<String> subDirs = Lists.newArrayList();
List<String> matchingFiles = Lists.newArrayList();
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
index 41127f7c7..0f01afa28 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,26 +109,29 @@ public class DeleteReachableFilesSparkAction
PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
"Cannot delete files: GC is disabled (deleting files may corrupt other tables)");
- Dataset<Row> reachableFileDF = buildReachableFileDF(metadata).distinct();
+ Dataset<FileInfo> reachableFileDS = reachableFileDS(metadata);
- boolean streamResults =
- PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
- if (streamResults) {
- return deleteFiles(reachableFileDF.toLocalIterator());
+ if (streamResults()) {
+ return deleteFiles(reachableFileDS.toLocalIterator());
} else {
- return deleteFiles(reachableFileDF.collectAsList().iterator());
+ return deleteFiles(reachableFileDS.collectAsList().iterator());
}
}
- private Dataset<Row> buildReachableFileDF(TableMetadata metadata) {
+ private boolean streamResults() {
+ return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
+ }
+
+ private Dataset<FileInfo> reachableFileDS(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, io);
- return buildValidContentFileWithTypeDF(staticTable)
- .union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
- .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST))
- .union(withFileType(buildAllReachableOtherMetadataFileDF(staticTable), OTHERS));
+ return contentFileDS(staticTable)
+ .union(manifestDS(staticTable))
+ .union(manifestListDS(staticTable))
+ .union(allReachableOtherMetadataFileDS(staticTable))
+ .distinct();
}
- private DeleteReachableFiles.Result deleteFiles(Iterator<Row> files) {
+ private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) {
DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files);
LOG.info("Deleted {} total files", summary.totalFilesCount());
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 13402d40d..46e645519 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -42,6 +42,8 @@ import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.execution.QueryExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +86,7 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction<ExpireSnapshotsS
private Integer retainLastValue = null;
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = null;
- private Dataset<Row> expiredFiles = null;
+ private Dataset<FileInfo> expiredFileDS = null;
ExpireSnapshotsSparkAction(SparkSession spark, Table table) {
super(spark);
@@ -140,19 +142,35 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction<ExpireSnapshotsS
*
* <p>This does not delete data files. To delete data files, run {@link #execute()}.
*
- * <p>This may be called before or after {@link #execute()} is called to return the expired file
- * list.
+ * <p>This may be called before or after {@link #execute()} to return the expired files.
*
* @return a Dataset of files that are no longer referenced by the table
+ * @deprecated since 1.0.0, will be removed in 1.1.0; use {@link #expireFiles()} instead.
*/
+ @Deprecated
public Dataset<Row> expire() {
- if (expiredFiles == null) {
+ // rely on the same query execution to reuse shuffles
+ QueryExecution queryExecution = expireFiles().queryExecution();
+ return new Dataset<>(queryExecution, RowEncoder.apply(queryExecution.analyzed().schema()));
+ }
+
+ /**
+ * Expires snapshots and commits the changes to the table, returning a Dataset of files to delete.
+ *
+ * <p>This does not delete data files. To delete data files, run {@link #execute()}.
+ *
+ * <p>This may be called before or after {@link #execute()} to return the expired files.
+ *
+ * @return a Dataset of files that are no longer referenced by the table
+ */
+ public Dataset<FileInfo> expireFiles() {
+ if (expiredFileDS == null) {
// fetch metadata before expiration
- Dataset<Row> originalFiles = buildValidFileDF(ops.current());
+ Dataset<FileInfo> originalFileDS = validFileDS(ops.current());
// perform expiration
- org.apache.iceberg.ExpireSnapshots expireSnapshots =
- table.expireSnapshots().cleanExpiredFiles(false);
+ org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots();
+
for (long id : expiredSnapshotIds) {
expireSnapshots = expireSnapshots.expireSnapshotId(id);
}
@@ -165,16 +183,16 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction<ExpireSnapshotsS
expireSnapshots = expireSnapshots.retainLast(retainLastValue);
}
- expireSnapshots.commit();
+ expireSnapshots.cleanExpiredFiles(false).commit();
// fetch metadata after expiration
- Dataset<Row> validFiles = buildValidFileDF(ops.refresh());
+ Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
// determine expired files
- this.expiredFiles = originalFiles.except(validFiles);
+ this.expiredFileDS = originalFileDS.except(validFileDS);
}
- return expiredFiles;
+ return expiredFileDS;
}
@Override
@@ -209,23 +227,25 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction<ExpireSnapshotsS
}
private ExpireSnapshots.Result doExecute() {
- boolean streamResults =
- PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
- if (streamResults) {
- return deleteFiles(expire().toLocalIterator());
+ if (streamResults()) {
+ return deleteFiles(expireFiles().toLocalIterator());
} else {
- return deleteFiles(expire().collectAsList().iterator());
+ return deleteFiles(expireFiles().collectAsList().iterator());
}
}
- private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
+ private boolean streamResults() {
+ return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
+ }
+
+ private Dataset<FileInfo> validFileDS(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, table.io());
- return buildValidContentFileWithTypeDF(staticTable)
- .union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
- .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
+ return contentFileDS(staticTable)
+ .union(manifestDS(staticTable))
+ .union(manifestListDS(staticTable));
}
- private ExpireSnapshots.Result deleteFiles(Iterator<Row> files) {
+ private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files);
LOG.info("Deleted {} total files", summary.totalFilesCount());
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/FileInfo.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/FileInfo.java
new file mode 100644
index 000000000..51ff7c80f
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/FileInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+
+public class FileInfo {
+ public static final Encoder<FileInfo> ENCODER = Encoders.bean(FileInfo.class);
+
+ private String path;
+ private String type;
+
+ public FileInfo(String path, String type) {
+ this.path = path;
+ this.type = type;
+ }
+
+ public FileInfo() {}
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
index 1f82eabc6..45647070e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
@@ -22,8 +22,12 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
public class ManifestFileBean implements ManifestFile {
+ public static final Encoder<ManifestFileBean> ENCODER = Encoders.bean(ManifestFileBean.class);
+
private String path = null;
private Long length = null;
private Integer partitionSpecId = null;
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index 6c6240a3b..b2421abfb 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -1176,10 +1176,10 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
Assert.assertEquals("Should not delete any files", 0, deletedFiles.size());
- Assert.assertSame(
- "Multiple calls to expire should return the same deleted files",
- pendingDeletes,
- action.expire());
+ Assert.assertEquals(
+ "Multiple calls to expire should return the same count of deleted files",
+ pendingDeletes.count(),
+ action.expire().count());
}
@Test
@@ -1215,4 +1215,39 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
jobsRunDuringStreamResults);
});
}
+
+ @Test
+ public void testExpireAfterExecute() {
+ table
+ .newAppend()
+ .appendFile(FILE_A) // data_bucket=0
+ .commit();
+
+ rightAfterSnapshot();
+
+ table
+ .newAppend()
+ .appendFile(FILE_B) // data_bucket=1
+ .commit();
+
+ table
+ .newAppend()
+ .appendFile(FILE_C) // data_bucket=2
+ .commit();
+
+ long t3 = rightAfterSnapshot();
+
+ ExpireSnapshotsSparkAction action = SparkActions.get().expireSnapshots(table);
+
+ action.expireOlderThan(t3).retainLast(2);
+
+ ExpireSnapshots.Result result = action.execute();
+ checkExpirationResults(0L, 0L, 0L, 0L, 1L, result);
+
+ List<FileInfo> typedExpiredFiles = action.expireFiles().collectAsList();
+ Assert.assertEquals("Expired results must match", 1, typedExpiredFiles.size());
+
+ List<Row> untypedExpiredFiles = action.expire().collectAsList();
+ Assert.assertEquals("Expired results must match", 1, untypedExpiredFiles.size());
+ }
}