You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/08/10 17:28:16 UTC

[iceberg] branch master updated: Spark 3.3: Use typed beans in BaseSparkAction (#5469)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c5dc6fb6 Spark 3.3: Use typed beans in BaseSparkAction (#5469)
4c5dc6fb6 is described below

commit 4c5dc6fb6e2685221c66f8eb7762d5f8d8786d75
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Aug 10 10:28:11 2022 -0700

    Spark 3.3: Use typed beans in BaseSparkAction (#5469)
---
 .../iceberg/spark/actions/BaseSparkAction.java     | 111 ++++++++-------------
 .../actions/DeleteOrphanFilesSparkAction.java      |  13 ++-
 .../actions/DeleteReachableFilesSparkAction.java   |  28 +++---
 .../spark/actions/ExpireSnapshotsSparkAction.java  |  62 ++++++++----
 .../org/apache/iceberg/spark/actions/FileInfo.java |  52 ++++++++++
 .../iceberg/spark/actions/ManifestFileBean.java    |   4 +
 .../spark/actions/TestExpireSnapshotsAction.java   |  43 +++++++-
 7 files changed, 205 insertions(+), 108 deletions(-)

diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index e9012a6d7..728d21df0 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.PartitionSpec;
@@ -59,12 +60,10 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
 abstract class BaseSparkAction<ThisT> {
 
@@ -135,9 +134,10 @@ abstract class BaseSparkAction<ThisT> {
   }
 
   // builds a DF of delete and data file path and type by reading all manifests
-  protected Dataset<Row> buildValidContentFileWithTypeDF(Table table) {
-    Broadcast<Table> tableBroadcast =
-        sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+  protected Dataset<FileInfo> contentFileDS(Table table) {
+    Table serializableTable = SerializableTableWithSize.copyOf(table);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
 
     Dataset<ManifestFileBean> allManifests =
         loadMetadataTable(table, ALL_MANIFESTS)
@@ -148,68 +148,47 @@ abstract class BaseSparkAction<ThisT> {
                 "partition_spec_id as partitionSpecId",
                 "added_snapshot_id as addedSnapshotId")
             .dropDuplicates("path")
-            .repartition(
-                spark
-                    .sessionState()
-                    .conf()
-                    .numShufflePartitions()) // avoid adaptive execution combining tasks
-            .as(Encoders.bean(ManifestFileBean.class));
-
-    return allManifests
-        .flatMap(
-            new ReadManifest(tableBroadcast), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
-        .toDF(FILE_PATH, FILE_TYPE);
-  }
+            .repartition(numShufflePartitions) // avoid adaptive execution combining tasks
+            .as(ManifestFileBean.ENCODER);
 
-  // builds a DF of delete and data file paths by reading all manifests
-  protected Dataset<Row> buildValidContentFileDF(Table table) {
-    return buildValidContentFileWithTypeDF(table).select(FILE_PATH);
+    return allManifests.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
   }
 
-  protected Dataset<Row> buildManifestFileDF(Table table) {
-    return loadMetadataTable(table, ALL_MANIFESTS).select(col("path").as(FILE_PATH));
+  protected Dataset<FileInfo> manifestDS(Table table) {
+    return loadMetadataTable(table, ALL_MANIFESTS)
+        .select(col("path"), lit(MANIFEST).as("type"))
+        .as(FileInfo.ENCODER);
   }
 
-  protected Dataset<Row> buildManifestListDF(Table table) {
+  protected Dataset<FileInfo> manifestListDS(Table table) {
     List<String> manifestLists = ReachableFileUtil.manifestListLocations(table);
-    return spark.createDataset(manifestLists, Encoders.STRING()).toDF(FILE_PATH);
+    return toFileInfoDS(manifestLists, MANIFEST_LIST);
   }
 
-  protected Dataset<Row> buildOtherMetadataFileDF(Table table) {
-    return buildOtherMetadataFileDF(
-        table, false /* include all reachable previous metadata locations */);
+  protected Dataset<FileInfo> otherMetadataFileDS(Table table) {
+    return otherMetadataFileDS(table, false /* include all reachable old metadata locations */);
   }
 
-  protected Dataset<Row> buildAllReachableOtherMetadataFileDF(Table table) {
-    return buildOtherMetadataFileDF(
-        table, true /* include all reachable previous metadata locations */);
+  protected Dataset<FileInfo> allReachableOtherMetadataFileDS(Table table) {
+    return otherMetadataFileDS(table, true /* include all reachable old metadata locations */);
   }
 
-  private Dataset<Row> buildOtherMetadataFileDF(
-      Table table, boolean includePreviousMetadataLocations) {
+  private Dataset<FileInfo> otherMetadataFileDS(Table table, boolean recursive) {
     List<String> otherMetadataFiles = Lists.newArrayList();
-    otherMetadataFiles.addAll(
-        ReachableFileUtil.metadataFileLocations(table, includePreviousMetadataLocations));
+    otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, recursive));
     otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table));
-    return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF(FILE_PATH);
-  }
-
-  protected Dataset<Row> buildValidMetadataFileDF(Table table) {
-    Dataset<Row> manifestDF = buildManifestFileDF(table);
-    Dataset<Row> manifestListDF = buildManifestListDF(table);
-    Dataset<Row> otherMetadataFileDF = buildOtherMetadataFileDF(table);
-
-    return manifestDF.union(otherMetadataFileDF).union(manifestListDF);
-  }
-
-  protected Dataset<Row> withFileType(Dataset<Row> ds, String type) {
-    return ds.withColumn(FILE_TYPE, lit(type));
+    return toFileInfoDS(otherMetadataFiles, OTHERS);
   }
 
   protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
+  private Dataset<FileInfo> toFileInfoDS(List<String> paths, String type) {
+    List<FileInfo> fileInfoList = Lists.transform(paths, path -> new FileInfo(path, type));
+    return spark.createDataset(fileInfoList, FileInfo.ENCODER);
+  }
+
   /**
    * Deletes files and keeps track of how many files were removed for each file type.
    *
@@ -219,7 +198,7 @@ abstract class BaseSparkAction<ThisT> {
    * @return stats on which files were deleted
    */
   protected DeleteSummary deleteFiles(
-      ExecutorService executorService, Consumer<String> deleteFunc, Iterator<Row> files) {
+      ExecutorService executorService, Consumer<String> deleteFunc, Iterator<FileInfo> files) {
 
     DeleteSummary summary = new DeleteSummary();
 
@@ -230,14 +209,14 @@ abstract class BaseSparkAction<ThisT> {
         .executeWith(executorService)
         .onFailure(
             (fileInfo, exc) -> {
-              String path = fileInfo.getString(0);
-              String type = fileInfo.getString(1);
+              String path = fileInfo.getPath();
+              String type = fileInfo.getType();
               LOG.warn("Delete failed for {}: {}", type, path, exc);
             })
         .run(
             fileInfo -> {
-              String path = fileInfo.getString(0);
-              String type = fileInfo.getString(1);
+              String path = fileInfo.getPath();
+              String type = fileInfo.getType();
               deleteFunc.accept(path);
               summary.deletedFile(path, type);
             });
@@ -317,8 +296,7 @@ abstract class BaseSparkAction<ThisT> {
     }
   }
 
-  private static class ReadManifest
-      implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, FileInfo> {
     private final Broadcast<Table> table;
 
     ReadManifest(Broadcast<Table> table) {
@@ -326,33 +304,32 @@ abstract class BaseSparkAction<ThisT> {
     }
 
     @Override
-    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+    public Iterator<FileInfo> call(ManifestFileBean manifest) {
       return new ClosingIterator<>(entries(manifest));
     }
 
-    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
+    public CloseableIterator<FileInfo> entries(ManifestFileBean manifest) {
+      ManifestContent content = manifest.content();
       FileIO io = table.getValue().io();
       Map<Integer, PartitionSpec> specs = table.getValue().specs();
-      ImmutableList<String> projection =
-          ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name());
+      List<String> proj = ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name());
 
-      switch (manifest.content()) {
+      switch (content) {
         case DATA:
           return CloseableIterator.transform(
-              ManifestFiles.read(manifest, io, specs).select(projection).iterator(),
-              ReadManifest::contentFileWithType);
+              ManifestFiles.read(manifest, io, specs).select(proj).iterator(),
+              ReadManifest::toFileInfo);
         case DELETES:
           return CloseableIterator.transform(
-              ManifestFiles.readDeleteManifest(manifest, io, specs).select(projection).iterator(),
-              ReadManifest::contentFileWithType);
+              ManifestFiles.readDeleteManifest(manifest, io, specs).select(proj).iterator(),
+              ReadManifest::toFileInfo);
         default:
-          throw new IllegalArgumentException(
-              "Unsupported manifest content type:" + manifest.content());
+          throw new IllegalArgumentException("Unsupported manifest content type:" + content);
       }
     }
 
-    static Tuple2<String, String> contentFileWithType(ContentFile<?> file) {
-      return new Tuple2<>(file.path().toString(), file.content().toString());
+    static FileInfo toFileInfo(ContentFile<?> file) {
+      return new FileInfo(file.path().toString(), file.content().toString());
     }
   }
 }
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 7df3eaf94..527783b9c 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -238,9 +238,7 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
   }
 
   private DeleteOrphanFiles.Result doExecute() {
-    Dataset<Row> validContentFileDF = buildValidContentFileDF(table);
-    Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
-    Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
+    Dataset<Row> validFileDF = buildValidFileDF();
     Dataset<Row> actualFileDF =
         compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
 
@@ -258,6 +256,15 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
 
+  private Dataset<Row> buildValidFileDF() {
+    return contentFileDS(table)
+        .union(manifestDS(table))
+        .union(manifestListDS(table))
+        .union(otherMetadataFileDS(table))
+        .toDF(FILE_PATH, FILE_TYPE)
+        .select(FILE_PATH);
+  }
+
   private Dataset<Row> buildActualFileDF() {
     List<String> subDirs = Lists.newArrayList();
     List<String> matchingFiles = Lists.newArrayList();
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
index 41127f7c7..0f01afa28 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,26 +109,29 @@ public class DeleteReachableFilesSparkAction
         PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
         "Cannot delete files: GC is disabled (deleting files may corrupt other tables)");
 
-    Dataset<Row> reachableFileDF = buildReachableFileDF(metadata).distinct();
+    Dataset<FileInfo> reachableFileDS = reachableFileDS(metadata);
 
-    boolean streamResults =
-        PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
-    if (streamResults) {
-      return deleteFiles(reachableFileDF.toLocalIterator());
+    if (streamResults()) {
+      return deleteFiles(reachableFileDS.toLocalIterator());
     } else {
-      return deleteFiles(reachableFileDF.collectAsList().iterator());
+      return deleteFiles(reachableFileDS.collectAsList().iterator());
     }
   }
 
-  private Dataset<Row> buildReachableFileDF(TableMetadata metadata) {
+  private boolean streamResults() {
+    return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
+  }
+
+  private Dataset<FileInfo> reachableFileDS(TableMetadata metadata) {
     Table staticTable = newStaticTable(metadata, io);
-    return buildValidContentFileWithTypeDF(staticTable)
-        .union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
-        .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST))
-        .union(withFileType(buildAllReachableOtherMetadataFileDF(staticTable), OTHERS));
+    return contentFileDS(staticTable)
+        .union(manifestDS(staticTable))
+        .union(manifestListDS(staticTable))
+        .union(allReachableOtherMetadataFileDS(staticTable))
+        .distinct();
   }
 
-  private DeleteReachableFiles.Result deleteFiles(Iterator<Row> files) {
+  private DeleteReachableFiles.Result deleteFiles(Iterator<FileInfo> files) {
     DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files);
     LOG.info("Deleted {} total files", summary.totalFilesCount());
 
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 13402d40d..46e645519 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -42,6 +42,8 @@ import org.apache.iceberg.util.PropertyUtil;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.execution.QueryExecution;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,7 +86,7 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction<ExpireSnapshotsS
   private Integer retainLastValue = null;
   private Consumer<String> deleteFunc = defaultDelete;
   private ExecutorService deleteExecutorService = null;
-  private Dataset<Row> expiredFiles = null;
+  private Dataset<FileInfo> expiredFileDS = null;
 
   ExpireSnapshotsSparkAction(SparkSession spark, Table table) {
     super(spark);
@@ -140,19 +142,35 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction<ExpireSnapshotsS
    *
    * <p>This does not delete data files. To delete data files, run {@link #execute()}.
    *
-   * <p>This may be called before or after {@link #execute()} is called to return the expired file
-   * list.
+   * <p>This may be called before or after {@link #execute()} to return the expired files.
    *
    * @return a Dataset of files that are no longer referenced by the table
+   * @deprecated since 1.0.0, will be removed in 1.1.0; use {@link #expireFiles()} instead.
    */
+  @Deprecated
   public Dataset<Row> expire() {
-    if (expiredFiles == null) {
+    // rely on the same query execution to reuse shuffles
+    QueryExecution queryExecution = expireFiles().queryExecution();
+    return new Dataset<>(queryExecution, RowEncoder.apply(queryExecution.analyzed().schema()));
+  }
+
+  /**
+   * Expires snapshots and commits the changes to the table, returning a Dataset of files to delete.
+   *
+   * <p>This does not delete data files. To delete data files, run {@link #execute()}.
+   *
+   * <p>This may be called before or after {@link #execute()} to return the expired files.
+   *
+   * @return a Dataset of files that are no longer referenced by the table
+   */
+  public Dataset<FileInfo> expireFiles() {
+    if (expiredFileDS == null) {
       // fetch metadata before expiration
-      Dataset<Row> originalFiles = buildValidFileDF(ops.current());
+      Dataset<FileInfo> originalFileDS = validFileDS(ops.current());
 
       // perform expiration
-      org.apache.iceberg.ExpireSnapshots expireSnapshots =
-          table.expireSnapshots().cleanExpiredFiles(false);
+      org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots();
+
       for (long id : expiredSnapshotIds) {
         expireSnapshots = expireSnapshots.expireSnapshotId(id);
       }
@@ -165,16 +183,16 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction<ExpireSnapshotsS
         expireSnapshots = expireSnapshots.retainLast(retainLastValue);
       }
 
-      expireSnapshots.commit();
+      expireSnapshots.cleanExpiredFiles(false).commit();
 
       // fetch metadata after expiration
-      Dataset<Row> validFiles = buildValidFileDF(ops.refresh());
+      Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
 
       // determine expired files
-      this.expiredFiles = originalFiles.except(validFiles);
+      this.expiredFileDS = originalFileDS.except(validFileDS);
     }
 
-    return expiredFiles;
+    return expiredFileDS;
   }
 
   @Override
@@ -209,23 +227,25 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction<ExpireSnapshotsS
   }
 
   private ExpireSnapshots.Result doExecute() {
-    boolean streamResults =
-        PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
-    if (streamResults) {
-      return deleteFiles(expire().toLocalIterator());
+    if (streamResults()) {
+      return deleteFiles(expireFiles().toLocalIterator());
     } else {
-      return deleteFiles(expire().collectAsList().iterator());
+      return deleteFiles(expireFiles().collectAsList().iterator());
     }
   }
 
-  private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
+  private boolean streamResults() {
+    return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
+  }
+
+  private Dataset<FileInfo> validFileDS(TableMetadata metadata) {
     Table staticTable = newStaticTable(metadata, table.io());
-    return buildValidContentFileWithTypeDF(staticTable)
-        .union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
-        .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
+    return contentFileDS(staticTable)
+        .union(manifestDS(staticTable))
+        .union(manifestListDS(staticTable));
   }
 
-  private ExpireSnapshots.Result deleteFiles(Iterator<Row> files) {
+  private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
     DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files);
     LOG.info("Deleted {} total files", summary.totalFilesCount());
 
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/FileInfo.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/FileInfo.java
new file mode 100644
index 000000000..51ff7c80f
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/FileInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+
+public class FileInfo {
+  public static final Encoder<FileInfo> ENCODER = Encoders.bean(FileInfo.class);
+
+  private String path;
+  private String type;
+
+  public FileInfo(String path, String type) {
+    this.path = path;
+    this.type = type;
+  }
+
+  public FileInfo() {}
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
index 1f82eabc6..45647070e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
@@ -22,8 +22,12 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFile;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
 
 public class ManifestFileBean implements ManifestFile {
+  public static final Encoder<ManifestFileBean> ENCODER = Encoders.bean(ManifestFileBean.class);
+
   private String path = null;
   private Long length = null;
   private Integer partitionSpecId = null;
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index 6c6240a3b..b2421abfb 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -1176,10 +1176,10 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Assert.assertEquals("Should not delete any files", 0, deletedFiles.size());
 
-    Assert.assertSame(
-        "Multiple calls to expire should return the same deleted files",
-        pendingDeletes,
-        action.expire());
+    Assert.assertEquals(
+        "Multiple calls to expire should return the same count of deleted files",
+        pendingDeletes.count(),
+        action.expire().count());
   }
 
   @Test
@@ -1215,4 +1215,39 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
               jobsRunDuringStreamResults);
         });
   }
+
+  @Test
+  public void testExpireAfterExecute() {
+    table
+        .newAppend()
+        .appendFile(FILE_A) // data_bucket=0
+        .commit();
+
+    rightAfterSnapshot();
+
+    table
+        .newAppend()
+        .appendFile(FILE_B) // data_bucket=1
+        .commit();
+
+    table
+        .newAppend()
+        .appendFile(FILE_C) // data_bucket=2
+        .commit();
+
+    long t3 = rightAfterSnapshot();
+
+    ExpireSnapshotsSparkAction action = SparkActions.get().expireSnapshots(table);
+
+    action.expireOlderThan(t3).retainLast(2);
+
+    ExpireSnapshots.Result result = action.execute();
+    checkExpirationResults(0L, 0L, 0L, 0L, 1L, result);
+
+    List<FileInfo> typedExpiredFiles = action.expireFiles().collectAsList();
+    Assert.assertEquals("Expired results must match", 1, typedExpiredFiles.size());
+
+    List<Row> untypedExpiredFiles = action.expire().collectAsList();
+    Assert.assertEquals("Expired results must match", 1, untypedExpiredFiles.size());
+  }
 }