You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/07/27 01:40:42 UTC

[iceberg] branch master updated: Spark 3.2: Add prefix mismatch mode for deleting orphan files (#4652)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 679c3d4d95 Spark 3.2: Add prefix mismatch mode for deleting orphan files (#4652)
679c3d4d95 is described below

commit 679c3d4d95a8b29b9e4ca52fb2bf90838dd66d04
Author: Karuppayya <ka...@gmail.com>
AuthorDate: Tue Jul 26 18:40:38 2022 -0700

    Spark 3.2: Add prefix mismatch mode for deleting orphan files (#4652)
---
 .../apache/iceberg/actions/DeleteOrphanFiles.java  |  66 +++++++
 .../extensions/TestRemoveOrphanFilesProcedure.java |  89 +++++++++
 .../actions/DeleteOrphanFilesSparkAction.java      | 200 +++++++++++++++++++--
 .../iceberg/spark/actions/SetAccumulator.java      |  62 +++++++
 .../procedures/RemoveOrphanFilesProcedure.java     |  37 +++-
 .../spark/actions/TestRemoveOrphanFilesAction.java | 111 ++++++++++++
 6 files changed, 545 insertions(+), 20 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java
index 4ee75a6e79..75e593f276 100644
--- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java
@@ -19,8 +19,11 @@
 
 package org.apache.iceberg.actions;
 
+import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 /**
  * An action that deletes orphan metadata, data and delete files in a table.
@@ -80,6 +83,53 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+  /**
+   * Passes a prefix mismatch mode that determines how this action should handle situations when
+   * the metadata references files that match listed/provided files except for authority/scheme.
+   * <p>
+   * Possible values are "ERROR", "IGNORE", "DELETE". The default mismatch mode is "ERROR",
+   * which means an exception is thrown whenever there is a mismatch in authority/scheme.
+   * It's the recommended mismatch mode and should be changed only in some rare circumstances.
+   * If there is a mismatch, use {@link #equalSchemes(Map)} and {@link #equalAuthorities(Map)}
+   * to resolve conflicts by providing equivalent schemes and authorities. If it is impossible
+   * to determine whether the conflicting authorities/schemes are equal, set the prefix mismatch
+   * mode to "IGNORE" to skip files with mismatches. If you have manually inspected all conflicting
+   * authorities/schemes, provided equivalent schemes/authorities and are absolutely confident
+   * the remaining ones are different, set the prefix mismatch mode to "DELETE" to consider files
+   * with mismatches as orphan. It will be impossible to recover files after deletion,
+   * so the "DELETE" prefix mismatch mode must be used with extreme caution.
+   *
+   * @param newPrefixMismatchMode mode for handling prefix mismatches
+   * @return this for method chaining
+   */
+  default DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does not implement prefixMismatchMode");
+  }
+
+  /**
+   * Passes schemes that should be considered equal.
+   * <p>
+   * The key may include a comma-separated list of schemes. For instance, Map("s3a,s3,s3n", "s3").
+   *
+   * @param newEqualSchemes list of equal schemes
+   * @return this for method chaining
+   */
+  default DeleteOrphanFiles equalSchemes(Map<String, String> newEqualSchemes) {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does not implement equalSchemes");
+  }
+
+  /**
+   * Passes authorities that should be considered equal.
+   * <p>
+   * The key may include a comma-separate list of authorities. For instance, Map("s1name,s2name", "servicename").
+   *
+   * @param newEqualAuthorities list of equal authorities
+   * @return this for method chaining
+   */
+  default DeleteOrphanFiles equalAuthorities(Map<String, String> newEqualAuthorities) {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does not implement equalAuthorities");
+  }
+
   /**
    * The action result that contains a summary of the execution.
    */
@@ -89,4 +139,20 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
      */
     Iterable<String> orphanFileLocations();
   }
+
+  /**
+   * Defines the action behavior when location prefixes (scheme/authority) mismatch.
+   * <p>
+   * {@link #ERROR} - throw an exception.
+   * {@link #IGNORE} - no action.
+   * {@link #DELETE} - delete files.
+   */
+  enum PrefixMismatchMode {
+    ERROR, IGNORE, DELETE;
+
+    public static PrefixMismatchMode fromString(String modeAsString) {
+      Preconditions.checkArgument(modeAsString != null, "Mode should not be null");
+      return PrefixMismatchMode.valueOf(modeAsString.toUpperCase(Locale.ENGLISH));
+    }
+  }
 }
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index e3a3bbf64b..70c09cb300 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.spark.extensions;
 
 import java.io.IOException;
+import java.net.URI;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
@@ -28,18 +29,26 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
 import org.apache.iceberg.spark.source.SimpleRecord;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -400,4 +409,84 @@ public class TestRemoveOrphanFilesProcedure extends SparkExtensionsTestBase {
         .collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testRemoveOrphanFilesProcedureWithPrefixMode() throws NoSuchTableException, ParseException, IOException {
+    if (catalogName.equals("testhadoop")) {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+    } else {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg LOCATION '%s'", tableName,
+          temp.newFolder().toURI().toString());
+    }
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    String location = table.location();
+    Path originalPath = new Path(location);
+
+    URI uri = originalPath.toUri();
+    Path newParentPath = new Path("file1", uri.getAuthority(), uri.getPath());
+
+    DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned())
+        .withPath(new Path(newParentPath, "path/to/data-a.parquet").toString())
+        .withFileSizeInBytes(10)
+        .withRecordCount(1)
+        .build();
+    DataFile dataFile2 = DataFiles.builder(PartitionSpec.unpartitioned())
+        .withPath(new Path(newParentPath, "path/to/data-b.parquet").toString())
+        .withFileSizeInBytes(10)
+        .withRecordCount(1)
+        .build();
+
+    table.newFastAppend()
+        .appendFile(dataFile1)
+        .appendFile(dataFile2)
+        .commit();
+
+
+    Timestamp lastModifiedTimestamp = new Timestamp(10000);
+
+    List<FilePathLastModifiedRecord> allFiles = Lists.newArrayList(
+        new FilePathLastModifiedRecord(
+            new Path(originalPath, "path/to/data-a.parquet").toString(),
+            lastModifiedTimestamp),
+        new FilePathLastModifiedRecord(
+            new Path(originalPath, "path/to/data-b.parquet").toString(),
+            lastModifiedTimestamp),
+        new FilePathLastModifiedRecord(
+            ReachableFileUtil.versionHintLocation(table),
+            lastModifiedTimestamp));
+
+    for (String file : ReachableFileUtil.metadataFileLocations(table, true)) {
+      allFiles.add(new FilePathLastModifiedRecord(file, lastModifiedTimestamp));
+    }
+
+    for (ManifestFile manifest : TestHelpers.dataManifests(table)) {
+      allFiles.add(new FilePathLastModifiedRecord(manifest.path(), lastModifiedTimestamp));
+    }
+
+    Dataset<Row> compareToFileList = spark.createDataFrame(allFiles,
+                    FilePathLastModifiedRecord.class).withColumnRenamed("filePath", "file_path")
+            .withColumnRenamed("lastModified", "last_modified");
+    String fileListViewName = "files_view";
+    compareToFileList.createOrReplaceTempView(fileListViewName);
+    List<Object[]> orphanFiles = sql(
+            "CALL %s.system.remove_orphan_files(" +
+                    "table => '%s'," +
+                    "equal_schemes => map('file1', 'file')," +
+                    "file_list_view => '%s')",
+            catalogName, tableIdent, fileListViewName);
+    Assert.assertEquals(0, orphanFiles.size());
+
+    // Test with no equal schemes
+    AssertHelpers.assertThrows("Should complain about removing orphan files",
+        ValidationException.class, "Conflicting authorities/schemes: [(file1, file)]",
+        () -> sql(
+                "CALL %s.system.remove_orphan_files(" +
+                        "table => '%s'," +
+                        "file_list_view => '%s')",
+                catalogName, tableIdent, fileListViewName));
+
+    // Drop table in afterEach has purge and fails due to invalid scheme "file1" used in this test
+    // Dropping the table here
+    sql("DROP TABLE %s", tableName);
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 72b4f8f43a..9c0cebc57a 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -19,13 +19,15 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
 import java.sql.Timestamp;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -47,26 +49,32 @@ import org.apache.iceberg.hadoop.HiddenPathFilter;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.util.SerializableConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import static org.apache.iceberg.TableProperties.GC_ENABLED;
 import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
@@ -94,14 +102,8 @@ public class DeleteOrphanFilesSparkAction
     extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements DeleteOrphanFiles {
 
   private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
+  private static final Splitter COMMA = Splitter.on(",");
+  private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = ImmutableMap.of("s3n,s3a", "s3");
 
   private final SerializableConfiguration hadoopConf;
   private final int partitionDiscoveryParallelism;
@@ -113,6 +115,9 @@ public class DeleteOrphanFilesSparkAction
     }
   };
 
+  private Map<String, String> equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT);
+  private Map<String, String> equalAuthorities = Collections.emptyMap();
+  private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
   private String location = null;
   private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
   private Dataset<Row> compareToFileList;
@@ -143,6 +148,27 @@ public class DeleteOrphanFilesSparkAction
     return this;
   }
 
+  @Override
+  public DeleteOrphanFilesSparkAction prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
+    this.prefixMismatchMode = newPrefixMismatchMode;
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFilesSparkAction equalSchemes(Map<String, String> newEqualSchemes) {
+    this.equalSchemes = Maps.newHashMap();
+    equalSchemes.putAll(flattenMap(EQUAL_SCHEMES_DEFAULT));
+    equalSchemes.putAll(flattenMap(newEqualSchemes));
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFilesSparkAction equalAuthorities(Map<String, String> newEqualAuthorities) {
+    this.equalAuthorities = Maps.newHashMap();
+    equalAuthorities.putAll(flattenMap(newEqualAuthorities));
+    return this;
+  }
+
   @Override
   public DeleteOrphanFilesSparkAction location(String newLocation) {
     this.location = newLocation;
@@ -213,14 +239,8 @@ public class DeleteOrphanFilesSparkAction
     Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
     Dataset<Row> actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
 
-    Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
-    Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
-    Column nameEqual = actualFileName.equalTo(validFileName);
-    Column actualContains = actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
-    Column joinCond = nameEqual.and(actualContains);
-    List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
-        .as(Encoders.STRING())
-        .collectAsList();
+    List<String> orphanFiles = findOrphanFiles(spark(), actualFileDF, validFileDF,
+        equalSchemes, equalAuthorities, prefixMismatchMode);
 
     Tasks.foreach(orphanFiles)
         .noRetry()
@@ -326,6 +346,99 @@ public class DeleteOrphanFilesSparkAction
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(
+      SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row> validFileDF,
+      Map<String, String> equalSchemes, Map<String, String> equalAuthorities,
+      PrefixMismatchMode prefixMismatchMode) {
+    Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+        toFileMetadata(equalSchemes, equalAuthorities),
+        Encoders.bean(FileMetadata.class));
+    Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+        toFileMetadata(equalSchemes, equalAuthorities),
+        Encoders.bean(FileMetadata.class));
+
+    SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+    spark.sparkContext().register(conflicts);
+
+    Column joinCond = actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+    List<String> orphanFiles = actualFileMetadataDS.joinWith(validFileMetadataDS, joinCond, "leftouter")
+        .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
+        .collectAsList();
+
+    if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
+      throw new ValidationException("Unable to determine whether certain files are orphan. " +
+          "Metadata references files that match listed/provided files except for authority/scheme. " +
+          "Please, inspect the conflicting authorities/schemes and provide which of them are equal " +
+          "by further configuring the action via equalSchemes() and equalAuthorities() methods. " +
+          "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting " +
+          "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting " +
+          "authorities/schemes are different. It will be impossible to recover deleted files. " +
+          "Conflicting authorities/schemes: %s.", conflicts.value());
+    }
+
+    return orphanFiles;
+  }
+
+  private static Map<String, String> flattenMap(Map<String, String> map) {
+    Map<String, String> flattenedMap = Maps.newHashMap();
+    if (map != null) {
+      for (String key : map.keySet()) {
+        String value = map.get(key);
+        for (String splitKey : COMMA.split(key)) {
+          flattenedMap.put(splitKey.trim(), value.trim());
+        }
+      }
+    }
+    return flattenedMap;
+  }
+
+  private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>, String> findOrphanFiles(
+      PrefixMismatchMode mode,
+      SetAccumulator<Pair<String, String>> conflicts) {
+    return rows -> {
+      Iterator<String> transformed = Iterators.transform(rows, row -> {
+        FileMetadata actual = row._1;
+        FileMetadata valid = row._2;
+
+        if (valid == null) {
+          return actual.location;
+        }
+
+        boolean schemeMatch = Strings.isNullOrEmpty(valid.scheme) ||
+            valid.scheme.equalsIgnoreCase(actual.scheme);
+        boolean authorityMatch = Strings.isNullOrEmpty(valid.authority) ||
+            valid.authority.equalsIgnoreCase(actual.authority);
+
+        if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
+          return actual.location;
+        } else {
+          if (!schemeMatch) {
+            conflicts.add(Pair.of(valid.scheme, actual.scheme));
+          }
+          if (!authorityMatch) {
+            conflicts.add(Pair.of(valid.authority, actual.authority));
+          }
+        }
+
+        return null;
+      });
+      return Iterators.filter(transformed, Objects::nonNull);
+    };
+  }
+
+  private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(
+      Map<String, String> equalSchemesMap, Map<String, String> equalAuthoritiesMap) {
+    return rows -> Iterators.transform(rows, row -> {
+      String location = row.getString(0);
+      URI uri = new Path(location).toUri();
+      String scheme = equalSchemesMap.getOrDefault(uri.getScheme(), uri.getScheme());
+      String authority = equalAuthoritiesMap.getOrDefault(uri.getAuthority(), uri.getAuthority());
+      return new FileMetadata(scheme, authority, uri.getPath(), location);
+    });
+  }
+
   /**
    * A {@link PathFilter} that filters out hidden path, but does not filter out paths that would be marked
    * as hidden by {@link HiddenPathFilter} due to a partition field that starts with one of the characters that
@@ -361,4 +474,53 @@ public class DeleteOrphanFilesSparkAction
       return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
     }
   }
+
+  public static class FileMetadata implements Serializable {
+    private String scheme;
+    private String authority;
+    private String path;
+    private String location;
+
+    public FileMetadata(String scheme, String authority, String path, String location) {
+      this.scheme = scheme;
+      this.authority = authority;
+      this.path = path;
+      this.location = location;
+    }
+
+    public FileMetadata() {
+    }
+
+    public void setScheme(String scheme) {
+      this.scheme = scheme;
+    }
+
+    public void setAuthority(String authority) {
+      this.authority = authority;
+    }
+
+    public void setPath(String path) {
+      this.path = path;
+    }
+
+    public void setLocation(String location) {
+      this.location = location;
+    }
+
+    public String getScheme() {
+      return scheme;
+    }
+
+    public String getAuthority() {
+      return authority;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public String getLocation() {
+      return location;
+    }
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java
new file mode 100644
index 0000000000..f16936949f
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.util.AccumulatorV2;
+
+public class SetAccumulator<T> extends AccumulatorV2<T, java.util.Set<T>> {
+
+  private final Set<T> set = Collections.synchronizedSet(Sets.newHashSet());
+
+  @Override
+  public boolean isZero() {
+    return set.isEmpty();
+  }
+
+  @Override
+  public AccumulatorV2<T, Set<T>> copy() {
+    SetAccumulator<T> newAccumulator = new SetAccumulator<>();
+    newAccumulator.set.addAll(set);
+    return newAccumulator;
+  }
+
+  @Override
+  public void reset() {
+    set.clear();
+  }
+
+  @Override
+  public void add(T v) {
+    set.add(v);
+  }
+
+  @Override
+  public void merge(AccumulatorV2<T, Set<T>> other) {
+    set.addAll(other.value());
+  }
+
+  @Override
+  public Set<T> value() {
+    return set;
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index a64a28b631..92b0ba30b5 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -19,11 +19,14 @@
 
 package org.apache.iceberg.spark.procedures;
 
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
@@ -37,6 +40,7 @@ import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
+import scala.runtime.BoxedUnit;
 
 /**
  * A procedure that removes orphan files in a table.
@@ -51,7 +55,10 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
       ProcedureParameter.optional("location", DataTypes.StringType),
       ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
       ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType),
-      ProcedureParameter.optional("file_list_view", DataTypes.StringType)
+      ProcedureParameter.optional("file_list_view", DataTypes.StringType),
+      ProcedureParameter.optional("equal_schemes", STRING_MAP),
+      ProcedureParameter.optional("equal_authorities", STRING_MAP),
+      ProcedureParameter.optional("prefix_mismatch_mode", DataTypes.StringType),
   };
 
   private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
@@ -94,6 +101,27 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
     Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
             "max_concurrent_deletes should have value > 0,  value: " + maxConcurrentDeletes);
 
+    Map<String, String> equalSchemes = Maps.newHashMap();
+    if (!args.isNullAt(6)) {
+      args.getMap(6).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            equalSchemes.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    Map<String, String> equalAuthorities = Maps.newHashMap();
+    if (!args.isNullAt(7)) {
+      args.getMap(7).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            equalSchemes.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    PrefixMismatchMode prefixMismatchMode = args.isNullAt(8) ? null :
+        PrefixMismatchMode.fromString(args.getString(8));
+
     return withIcebergTable(tableIdent, table -> {
       DeleteOrphanFilesSparkAction action = actions().deleteOrphanFiles(table);
 
@@ -121,6 +149,13 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
         action.compareToFileList(spark().table(fileListView));
       }
 
+      action.equalSchemes(equalSchemes);
+      action.equalAuthorities(equalAuthorities);
+
+      if (prefixMismatchMode != null) {
+        action.prefixMismatchMode(prefixMismatchMode);
+      }
+
       DeleteOrphanFiles.Result result = action.execute();
 
       return toOutputRows(result);
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 1d9e479397..e7240edda6 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.hadoop.HiddenPathFilter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -955,4 +956,114 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
     }
     return current;
   }
+
+  @Test
+  public void testPathsWithExtraSlashes() {
+    List<String> validFiles = Lists.newArrayList("file:///dir1/dir2/file1");
+    List<String> actualFiles = Lists.newArrayList("file:///dir1/////dir2///file1");
+    executeTest(validFiles, actualFiles, Lists.newArrayList());
+  }
+
+  @Test
+  public void testPathsWithValidFileHavingNoAuthority() {
+    List<String> validFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+    List<String> actualFiles = Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+    executeTest(validFiles, actualFiles, Lists.newArrayList());
+  }
+
+  @Test
+  public void testPathsWithActualFileHavingNoAuthority() {
+    List<String> validFiles = Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+    List<String> actualFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+    executeTest(validFiles, actualFiles, Lists.newArrayList());
+  }
+
+  @Test
+  public void testPathsWithEqualSchemes() {
+    List<String> validFiles = Lists.newArrayList("scheme1://bucket1/dir1/dir2/file1");
+    List<String> actualFiles = Lists.newArrayList("scheme2://bucket1/dir1/dir2/file1");
+    AssertHelpers.assertThrows("Test remove orphan files with equal schemes",
+        ValidationException.class,
+        "Conflicting authorities/schemes: [(scheme1, scheme2)]",
+        () -> executeTest(validFiles,
+            actualFiles,
+            Lists.newArrayList(),
+            ImmutableMap.of(),
+                ImmutableMap.of(),
+            DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+
+    Map<String, String> equalSchemes = Maps.newHashMap();
+    equalSchemes.put("scheme1", "scheme");
+    equalSchemes.put("scheme2", "scheme");
+    executeTest(validFiles,
+        actualFiles,
+        Lists.newArrayList(),
+        equalSchemes,
+            ImmutableMap.of(),
+        DeleteOrphanFiles.PrefixMismatchMode.ERROR);
+  }
+
+  @Test
+  public void testPathsWithEqualAuthorities() {
+    List<String> validFiles = Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
+    List<String> actualFiles = Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
+    AssertHelpers.assertThrows("Test remove orphan files with equal authorities",
+        ValidationException.class,
+        "Conflicting authorities/schemes: [(servicename1, servicename2)]",
+        () -> executeTest(validFiles,
+            actualFiles,
+            Lists.newArrayList(),
+                ImmutableMap.of(),
+                ImmutableMap.of(),
+            DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+
+    Map<String, String> equalAuthorities = Maps.newHashMap();
+    equalAuthorities.put("servicename1", "servicename");
+    equalAuthorities.put("servicename2", "servicename");
+    executeTest(validFiles,
+        actualFiles,
+        Lists.newArrayList(),
+            ImmutableMap.of(),
+        equalAuthorities,
+        DeleteOrphanFiles.PrefixMismatchMode.ERROR);
+  }
+
+  @Test
+  public void testRemoveOrphanFileActionWithDeleteMode() {
+    List<String> validFiles = Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
+    List<String> actualFiles = Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
+
+    executeTest(validFiles,
+        actualFiles,
+        Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1"),
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+        DeleteOrphanFiles.PrefixMismatchMode.DELETE);
+  }
+
+  private void executeTest(List<String> validFiles,
+                           List<String> actualFiles,
+                           List<String> expectedOrphanFiles) {
+    executeTest(validFiles, actualFiles, expectedOrphanFiles, ImmutableMap.of(), ImmutableMap.of(),
+        DeleteOrphanFiles.PrefixMismatchMode.IGNORE);
+  }
+
+  private void executeTest(List<String> validFiles,
+                           List<String> actualFiles,
+                           List<String> expectedOrphanFiles,
+                           Map<String, String> equalSchemes,
+                           Map<String, String> equalAuthorities,
+                           DeleteOrphanFiles.PrefixMismatchMode mode) {
+    Dataset<Row> validFilesDF = spark.createDataset(validFiles, Encoders.STRING()).toDF();
+    Dataset<Row> actualFilesDF = spark.createDataset(actualFiles, Encoders.STRING()).toDF();
+
+    List<String> orphanFiles = DeleteOrphanFilesSparkAction.findOrphanFiles(
+        spark,
+        actualFilesDF,
+        validFilesDF,
+        equalSchemes,
+        equalAuthorities,
+        mode);
+    Assert.assertEquals(expectedOrphanFiles, orphanFiles);
+  }
 }