You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/08/18 16:22:45 UTC

[iceberg] branch master updated: Spark 3.3: Reduce serialization in DeleteOrphanFilesSparkAction (#5495)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c73c7729b2 Spark 3.3: Reduce serialization in DeleteOrphanFilesSparkAction (#5495)
c73c7729b2 is described below

commit c73c7729b20078d8a3c59be0f6c1c44635cbe090
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu Aug 18 09:22:39 2022 -0700

    Spark 3.3: Reduce serialization in DeleteOrphanFilesSparkAction (#5495)
---
 .../iceberg/spark/actions/BaseSparkAction.java     |   6 +-
 .../actions/DeleteOrphanFilesSparkAction.java      | 377 +++++++++++++--------
 .../spark/actions/ExpireSnapshotsSparkAction.java  |   4 +-
 .../spark/actions/TestRemoveOrphanFilesAction.java |  10 +-
 4 files changed, 240 insertions(+), 157 deletions(-)

diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 728d21df06..a1dfa6009e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -47,6 +47,8 @@ import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.ClosingIterator;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -72,9 +74,11 @@ abstract class BaseSparkAction<ThisT> {
   protected static final String OTHERS = "Others";
 
   protected static final String FILE_PATH = "file_path";
-  protected static final String FILE_TYPE = "file_type";
   protected static final String LAST_MODIFIED = "last_modified";
 
+  protected static final Splitter COMMA_SPLITTER = Splitter.on(",");
+  protected static final Joiner COMMA_JOINER = Joiner.on(',');
+
   private static final Logger LOG = LoggerFactory.getLogger(BaseSparkAction.class);
   private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
   private static final int DELETE_NUM_RETRIES = 3;
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 527783b9c8..1abd2107ed 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -23,6 +23,7 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.io.UncheckedIOException;
 import java.net.URI;
 import java.sql.Timestamp;
 import java.util.Collections;
@@ -45,13 +46,10 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HiddenPathFilter;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
@@ -67,6 +65,7 @@ import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -103,11 +102,14 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
     implements DeleteOrphanFiles {
 
   private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
-  private static final Splitter COMMA = Splitter.on(",");
   private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = ImmutableMap.of("s3n,s3a", "s3");
+  private static final int MAX_DRIVER_LISTING_DEPTH = 3;
+  private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10;
+  private static final int MAX_EXECUTOR_LISTING_DEPTH = 2000;
+  private static final int MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS = Integer.MAX_VALUE;
 
   private final SerializableConfiguration hadoopConf;
-  private final int partitionDiscoveryParallelism;
+  private final int listingParallelism;
   private final Table table;
   private final Consumer<String> defaultDelete =
       new Consumer<String>() {
@@ -130,8 +132,7 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
     super(spark);
 
     this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
-    this.partitionDiscoveryParallelism =
-        spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
+    this.listingParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
     this.table = table;
     this.location = table.location();
 
@@ -211,14 +212,15 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
     return this;
   }
 
-  private Dataset<Row> filteredCompareToFileList() {
+  private Dataset<String> filteredCompareToFileList() {
     Dataset<Row> files = compareToFileList;
     if (location != null) {
       files = files.filter(files.col(FILE_PATH).startsWith(location));
     }
     return files
         .filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp)))
-        .select(files.col(FILE_PATH));
+        .select(files.col(FILE_PATH))
+        .as(Encoders.STRING());
   }
 
   @Override
@@ -233,18 +235,16 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
     if (location != null) {
       options.add("location=" + location);
     }
-    return String.format(
-        "Deleting orphan files (%s) from %s", Joiner.on(',').join(options), table.name());
+    String optionsAsString = COMMA_JOINER.join(options);
+    return String.format("Deleting orphan files (%s) from %s", optionsAsString, table.name());
   }
 
   private DeleteOrphanFiles.Result doExecute() {
-    Dataset<Row> validFileDF = buildValidFileDF();
-    Dataset<Row> actualFileDF =
-        compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
+    Dataset<FileURI> actualFileIdentDS = actualFileIdentDS();
+    Dataset<FileURI> validFileIdentDS = validFileIdentDS();
 
     List<String> orphanFiles =
-        findOrphanFiles(
-            spark(), actualFileDF, validFileDF, equalSchemes, equalAuthorities, prefixMismatchMode);
+        findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode);
 
     Tasks.foreach(orphanFiles)
         .noRetry()
@@ -256,41 +256,64 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
 
-  private Dataset<Row> buildValidFileDF() {
-    return contentFileDS(table)
-        .union(manifestDS(table))
-        .union(manifestListDS(table))
-        .union(otherMetadataFileDS(table))
-        .toDF(FILE_PATH, FILE_TYPE)
-        .select(FILE_PATH);
+  private Dataset<FileURI> validFileIdentDS() {
+    // transform before union to avoid extra serialization/deserialization
+    FileInfoToFileURI toFileURI = new FileInfoToFileURI(equalSchemes, equalAuthorities);
+
+    Dataset<FileURI> contentFileIdentDS = toFileURI.apply(contentFileDS(table));
+    Dataset<FileURI> manifestFileIdentDS = toFileURI.apply(manifestDS(table));
+    Dataset<FileURI> manifestListIdentDS = toFileURI.apply(manifestListDS(table));
+    Dataset<FileURI> otherMetadataFileIdentDS = toFileURI.apply(otherMetadataFileDS(table));
+
+    return contentFileIdentDS
+        .union(manifestFileIdentDS)
+        .union(manifestListIdentDS)
+        .union(otherMetadataFileIdentDS);
+  }
+
+  private Dataset<FileURI> actualFileIdentDS() {
+    StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities);
+    if (compareToFileList == null) {
+      return toFileURI.apply(listedFileDS());
+    } else {
+      return toFileURI.apply(filteredCompareToFileList());
+    }
   }
 
-  private Dataset<Row> buildActualFileDF() {
+  private Dataset<String> listedFileDS() {
     List<String> subDirs = Lists.newArrayList();
     List<String> matchingFiles = Lists.newArrayList();
 
     Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
     PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
 
-    // list at most 3 levels and only dirs that have less than 10 direct sub dirs on the driver
+    // list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
+    // less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
     listDirRecursively(
-        location, predicate, hadoopConf.value(), 3, 10, subDirs, pathFilter, matchingFiles);
+        location,
+        predicate,
+        hadoopConf.value(),
+        MAX_DRIVER_LISTING_DEPTH,
+        MAX_DRIVER_LISTING_DIRECT_SUB_DIRS,
+        subDirs,
+        pathFilter,
+        matchingFiles);
 
     JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
 
     if (subDirs.isEmpty()) {
-      return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
+      return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
     }
 
-    int parallelism = Math.min(subDirs.size(), partitionDiscoveryParallelism);
+    int parallelism = Math.min(subDirs.size(), listingParallelism);
     JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);
 
     Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
-    JavaRDD<String> matchingLeafFileRDD =
-        subDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp, pathFilter));
+    ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
+    JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);
 
     JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
-    return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
+    return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
   }
 
   private static void listDirRecursively(
@@ -341,68 +364,26 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
             matchingFiles);
       }
     } catch (IOException e) {
-      throw new RuntimeIOException(e);
+      throw new UncheckedIOException(e);
     }
   }
 
-  private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
-      Broadcast<SerializableConfiguration> conf, long olderThanTimestamp, PathFilter pathFilter) {
-
-    return dirs -> {
-      List<String> subDirs = Lists.newArrayList();
-      List<String> files = Lists.newArrayList();
-
-      Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
-
-      int maxDepth = 2000;
-      int maxDirectSubDirs = Integer.MAX_VALUE;
-
-      dirs.forEachRemaining(
-          dir -> {
-            listDirRecursively(
-                dir,
-                predicate,
-                conf.value().value(),
-                maxDepth,
-                maxDirectSubDirs,
-                subDirs,
-                pathFilter,
-                files);
-          });
-
-      if (!subDirs.isEmpty()) {
-        throw new RuntimeException(
-            "Could not list subdirectories, reached maximum subdirectory depth: " + maxDepth);
-      }
-
-      return files.iterator();
-    };
-  }
-
   @VisibleForTesting
   static List<String> findOrphanFiles(
       SparkSession spark,
-      Dataset<Row> actualFileDF,
-      Dataset<Row> validFileDF,
-      Map<String, String> equalSchemes,
-      Map<String, String> equalAuthorities,
+      Dataset<FileURI> actualFileIdentDS,
+      Dataset<FileURI> validFileIdentDS,
       PrefixMismatchMode prefixMismatchMode) {
-    Dataset<FileMetadata> actualFileMetadataDS =
-        actualFileDF.mapPartitions(
-            toFileMetadata(equalSchemes, equalAuthorities), Encoders.bean(FileMetadata.class));
-    Dataset<FileMetadata> validFileMetadataDS =
-        validFileDF.mapPartitions(
-            toFileMetadata(equalSchemes, equalAuthorities), Encoders.bean(FileMetadata.class));
 
     SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
     spark.sparkContext().register(conflicts);
 
-    Column joinCond = actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+    Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));
 
     List<String> orphanFiles =
-        actualFileMetadataDS
-            .joinWith(validFileMetadataDS, joinCond, "leftouter")
-            .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
+        actualFileIdentDS
+            .joinWith(validFileIdentDS, joinCond, "leftouter")
+            .mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
             .collectAsList();
 
     if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
@@ -426,7 +407,7 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
     if (map != null) {
       for (String key : map.keySet()) {
         String value = map.get(key);
-        for (String splitKey : COMMA.split(key)) {
+        for (String splitKey : COMMA_SPLITTER.split(key)) {
           flattenedMap.put(splitKey.trim(), value.trim());
         }
       }
@@ -434,57 +415,150 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
     return flattenedMap;
   }
 
-  private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>, String> findOrphanFiles(
-      PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) {
-    return rows -> {
-      Iterator<String> transformed =
-          Iterators.transform(
-              rows,
-              row -> {
-                FileMetadata actual = row._1;
-                FileMetadata valid = row._2;
-
-                if (valid == null) {
-                  return actual.location;
-                }
-
-                boolean schemeMatch =
-                    Strings.isNullOrEmpty(valid.scheme)
-                        || valid.scheme.equalsIgnoreCase(actual.scheme);
-                boolean authorityMatch =
-                    Strings.isNullOrEmpty(valid.authority)
-                        || valid.authority.equalsIgnoreCase(actual.authority);
-
-                if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
-                  return actual.location;
-                } else {
-                  if (!schemeMatch) {
-                    conflicts.add(Pair.of(valid.scheme, actual.scheme));
-                  }
-                  if (!authorityMatch) {
-                    conflicts.add(Pair.of(valid.authority, actual.authority));
-                  }
-                }
-
-                return null;
-              });
-      return Iterators.filter(transformed, Objects::nonNull);
-    };
+  private static class ListDirsRecursively implements FlatMapFunction<Iterator<String>, String> {
+
+    private final Broadcast<SerializableConfiguration> hadoopConf;
+    private final long olderThanTimestamp;
+    private final PathFilter pathFilter;
+
+    ListDirsRecursively(
+        Broadcast<SerializableConfiguration> hadoopConf,
+        long olderThanTimestamp,
+        PathFilter pathFilter) {
+
+      this.hadoopConf = hadoopConf;
+      this.olderThanTimestamp = olderThanTimestamp;
+      this.pathFilter = pathFilter;
+    }
+
+    @Override
+    public Iterator<String> call(Iterator<String> dirs) throws Exception {
+      List<String> subDirs = Lists.newArrayList();
+      List<String> files = Lists.newArrayList();
+
+      Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
+
+      while (dirs.hasNext()) {
+        listDirRecursively(
+            dirs.next(),
+            predicate,
+            hadoopConf.value().value(),
+            MAX_EXECUTOR_LISTING_DEPTH,
+            MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS,
+            subDirs,
+            pathFilter,
+            files);
+      }
+
+      if (!subDirs.isEmpty()) {
+        throw new RuntimeException(
+            "Could not list sub directories, reached maximum depth: " + MAX_EXECUTOR_LISTING_DEPTH);
+      }
+
+      return files.iterator();
+    }
+  }
+
+  private static class FindOrphanFiles
+      implements MapPartitionsFunction<Tuple2<FileURI, FileURI>, String> {
+
+    private final PrefixMismatchMode mode;
+    private final SetAccumulator<Pair<String, String>> conflicts;
+
+    FindOrphanFiles(PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) {
+      this.mode = mode;
+      this.conflicts = conflicts;
+    }
+
+    @Override
+    public Iterator<String> call(Iterator<Tuple2<FileURI, FileURI>> rows) throws Exception {
+      Iterator<String> orphanFiles = Iterators.transform(rows, this::toOrphanFile);
+      return Iterators.filter(orphanFiles, Objects::nonNull);
+    }
+
+    private String toOrphanFile(Tuple2<FileURI, FileURI> row) {
+      FileURI actual = row._1;
+      FileURI valid = row._2;
+
+      if (valid == null) {
+        return actual.uriAsString;
+      }
+
+      boolean schemeMatch = uriComponentMatch(valid.scheme, actual.scheme);
+      boolean authorityMatch = uriComponentMatch(valid.authority, actual.authority);
+
+      if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
+        return actual.uriAsString;
+      } else {
+        if (!schemeMatch) {
+          conflicts.add(Pair.of(valid.scheme, actual.scheme));
+        }
+
+        if (!authorityMatch) {
+          conflicts.add(Pair.of(valid.authority, actual.authority));
+        }
+
+        return null;
+      }
+    }
+
+    private boolean uriComponentMatch(String valid, String actual) {
+      return Strings.isNullOrEmpty(valid) || valid.equalsIgnoreCase(actual);
+    }
+  }
+
+  @VisibleForTesting
+  static class StringToFileURI extends ToFileURI<String> {
+    StringToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
+      super(equalSchemes, equalAuthorities);
+    }
+
+    @Override
+    protected String uriAsString(String input) {
+      return input;
+    }
+  }
+
+  @VisibleForTesting
+  static class FileInfoToFileURI extends ToFileURI<FileInfo> {
+    FileInfoToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
+      super(equalSchemes, equalAuthorities);
+    }
+
+    @Override
+    protected String uriAsString(FileInfo fileInfo) {
+      return fileInfo.getPath();
+    }
   }
 
-  private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(
-      Map<String, String> equalSchemesMap, Map<String, String> equalAuthoritiesMap) {
-    return rows ->
-        Iterators.transform(
-            rows,
-            row -> {
-              String location = row.getString(0);
-              URI uri = new Path(location).toUri();
-              String scheme = equalSchemesMap.getOrDefault(uri.getScheme(), uri.getScheme());
-              String authority =
-                  equalAuthoritiesMap.getOrDefault(uri.getAuthority(), uri.getAuthority());
-              return new FileMetadata(scheme, authority, uri.getPath(), location);
-            });
+  private abstract static class ToFileURI<I> implements MapPartitionsFunction<I, FileURI> {
+
+    private final Map<String, String> equalSchemes;
+    private final Map<String, String> equalAuthorities;
+
+    ToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
+      this.equalSchemes = equalSchemes;
+      this.equalAuthorities = equalAuthorities;
+    }
+
+    protected abstract String uriAsString(I input);
+
+    Dataset<FileURI> apply(Dataset<I> ds) {
+      return ds.mapPartitions(this, FileURI.ENCODER);
+    }
+
+    @Override
+    public Iterator<FileURI> call(Iterator<I> rows) throws Exception {
+      return Iterators.transform(rows, this::toFileURI);
+    }
+
+    private FileURI toFileURI(I input) {
+      String uriAsString = uriAsString(input);
+      URI uri = new Path(uriAsString).toUri();
+      String scheme = equalSchemes.getOrDefault(uri.getScheme(), uri.getScheme());
+      String authority = equalAuthorities.getOrDefault(uri.getAuthority(), uri.getAuthority());
+      return new FileURI(scheme, authority, uri.getPath(), uriAsString);
+    }
   }
 
   /**
@@ -503,9 +577,11 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
 
     @Override
     public boolean accept(Path path) {
-      boolean isHiddenPartitionPath =
-          hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
-      return isHiddenPartitionPath || HiddenPathFilter.get().accept(path);
+      return isHiddenPartitionPath(path) || HiddenPathFilter.get().accept(path);
+    }
+
+    private boolean isHiddenPartitionPath(Path path) {
+      return hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
     }
 
     static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
@@ -517,33 +593,34 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
           specs.values().stream()
               .map(PartitionSpec::fields)
               .flatMap(List::stream)
-              .filter(
-                  partitionField ->
-                      partitionField.name().startsWith("_")
-                          || partitionField.name().startsWith("."))
-              .map(partitionField -> partitionField.name() + "=")
+              .filter(field -> field.name().startsWith("_") || field.name().startsWith("."))
+              .map(field -> field.name() + "=")
               .collect(Collectors.toSet());
 
-      return partitionNames.isEmpty()
-          ? HiddenPathFilter.get()
-          : new PartitionAwareHiddenPathFilter(partitionNames);
+      if (partitionNames.isEmpty()) {
+        return HiddenPathFilter.get();
+      } else {
+        return new PartitionAwareHiddenPathFilter(partitionNames);
+      }
     }
   }
 
-  public static class FileMetadata implements Serializable {
+  public static class FileURI {
+    public static final Encoder<FileURI> ENCODER = Encoders.bean(FileURI.class);
+
     private String scheme;
     private String authority;
     private String path;
-    private String location;
+    private String uriAsString;
 
-    public FileMetadata(String scheme, String authority, String path, String location) {
+    public FileURI(String scheme, String authority, String path, String uriAsString) {
       this.scheme = scheme;
       this.authority = authority;
       this.path = path;
-      this.location = location;
+      this.uriAsString = uriAsString;
     }
 
-    public FileMetadata() {}
+    public FileURI() {}
 
     public void setScheme(String scheme) {
       this.scheme = scheme;
@@ -557,8 +634,8 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
       this.path = path;
     }
 
-    public void setLocation(String location) {
-      this.location = location;
+    public void setUriAsString(String uriAsString) {
+      this.uriAsString = uriAsString;
     }
 
     public String getScheme() {
@@ -573,8 +650,8 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
       return path;
     }
 
-    public String getLocation() {
-      return location;
+    public String getUriAsString() {
+      return uriAsString;
     }
   }
 }
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 46e6455190..b47f367dde 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -33,7 +33,6 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
 import org.apache.iceberg.actions.ExpireSnapshots;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -222,8 +221,7 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction<ExpireSnapshotsS
       }
     }
 
-    return String.format(
-        "Expiring snapshots (%s) in %s", Joiner.on(',').join(options), table.name());
+    return String.format("Expiring snapshots (%s) in %s", COMMA_JOINER.join(options), table.name());
   }
 
   private ExpireSnapshots.Result doExecute() {
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 20f995731e..10a33e836d 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.StringToFileURI;
 import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.iceberg.types.Types;
@@ -986,12 +987,15 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
       Map<String, String> equalSchemes,
       Map<String, String> equalAuthorities,
       DeleteOrphanFiles.PrefixMismatchMode mode) {
-    Dataset<Row> validFilesDF = spark.createDataset(validFiles, Encoders.STRING()).toDF();
-    Dataset<Row> actualFilesDF = spark.createDataset(actualFiles, Encoders.STRING()).toDF();
+
+    StringToFileURI toFileUri = new StringToFileURI(equalSchemes, equalAuthorities);
+
+    Dataset<String> validFileDS = spark.createDataset(validFiles, Encoders.STRING());
+    Dataset<String> actualFileDS = spark.createDataset(actualFiles, Encoders.STRING());
 
     List<String> orphanFiles =
         DeleteOrphanFilesSparkAction.findOrphanFiles(
-            spark, actualFilesDF, validFilesDF, equalSchemes, equalAuthorities, mode);
+            spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode);
     Assert.assertEquals(expectedOrphanFiles, orphanFiles);
   }
 }