You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/07/27 01:40:42 UTC
[iceberg] branch master updated: Spark 3.2: Add prefix mismatch mode for deleting orphan files (#4652)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 679c3d4d95 Spark 3.2: Add prefix mismatch mode for deleting orphan files (#4652)
679c3d4d95 is described below
commit 679c3d4d95a8b29b9e4ca52fb2bf90838dd66d04
Author: Karuppayya <ka...@gmail.com>
AuthorDate: Tue Jul 26 18:40:38 2022 -0700
Spark 3.2: Add prefix mismatch mode for deleting orphan files (#4652)
---
.../apache/iceberg/actions/DeleteOrphanFiles.java | 66 +++++++
.../extensions/TestRemoveOrphanFilesProcedure.java | 89 +++++++++
.../actions/DeleteOrphanFilesSparkAction.java | 200 +++++++++++++++++++--
.../iceberg/spark/actions/SetAccumulator.java | 62 +++++++
.../procedures/RemoveOrphanFilesProcedure.java | 37 +++-
.../spark/actions/TestRemoveOrphanFilesAction.java | 111 ++++++++++++
6 files changed, 545 insertions(+), 20 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java
index 4ee75a6e79..75e593f276 100644
--- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java
@@ -19,8 +19,11 @@
package org.apache.iceberg.actions;
+import java.util.Locale;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
/**
* An action that deletes orphan metadata, data and delete files in a table.
@@ -80,6 +83,53 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+ /**
+ * Passes a prefix mismatch mode that determines how this action should handle situations when
+ * the metadata references files that match listed/provided files except for authority/scheme.
+ * <p>
+ * Possible values are "ERROR", "IGNORE", "DELETE". The default mismatch mode is "ERROR",
+ * which means an exception is thrown whenever there is a mismatch in authority/scheme.
+ * It's the recommended mismatch mode and should be changed only in some rare circumstances.
+ * If there is a mismatch, use {@link #equalSchemes(Map)} and {@link #equalAuthorities(Map)}
+ * to resolve conflicts by providing equivalent schemes and authorities. If it is impossible
+ * to determine whether the conflicting authorities/schemes are equal, set the prefix mismatch
+ * mode to "IGNORE" to skip files with mismatches. If you have manually inspected all conflicting
+ * authorities/schemes, provided equivalent schemes/authorities and are absolutely confident
+ * the remaining ones are different, set the prefix mismatch mode to "DELETE" to consider files
+ * with mismatches as orphan. It will be impossible to recover files after deletion,
+ * so the "DELETE" prefix mismatch mode must be used with extreme caution.
+ *
+ * @param newPrefixMismatchMode mode for handling prefix mismatches
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does not implement prefixMismatchMode");
+ }
+
+ /**
+ * Passes schemes that should be considered equal.
+ * <p>
+ * The key may include a comma-separated list of schemes. For instance, Map("s3a,s3,s3n", "s3").
+ *
+ * @param newEqualSchemes list of equal schemes
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles equalSchemes(Map<String, String> newEqualSchemes) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does not implement equalSchemes");
+ }
+
+ /**
+ * Passes authorities that should be considered equal.
+ * <p>
+ * The key may include a comma-separate list of authorities. For instance, Map("s1name,s2name", "servicename").
+ *
+ * @param newEqualAuthorities list of equal authorities
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles equalAuthorities(Map<String, String> newEqualAuthorities) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does not implement equalAuthorities");
+ }
+
/**
* The action result that contains a summary of the execution.
*/
@@ -89,4 +139,20 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
*/
Iterable<String> orphanFileLocations();
}
+
+ /**
+ * Defines the action behavior when location prefixes (scheme/authority) mismatch.
+ * <p>
+ * {@link #ERROR} - throw an exception.
+ * {@link #IGNORE} - no action.
+ * {@link #DELETE} - delete files.
+ */
+ enum PrefixMismatchMode {
+ ERROR, IGNORE, DELETE;
+
+ public static PrefixMismatchMode fromString(String modeAsString) {
+ Preconditions.checkArgument(modeAsString != null, "Mode should not be null");
+ return PrefixMismatchMode.valueOf(modeAsString.toUpperCase(Locale.ENGLISH));
+ }
+ }
}
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index e3a3bbf64b..70c09cb300 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.spark.extensions;
import java.io.IOException;
+import java.net.URI;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
@@ -28,18 +29,26 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
@@ -400,4 +409,84 @@ public class TestRemoveOrphanFilesProcedure extends SparkExtensionsTestBase {
.collectAsList();
Assert.assertEquals("Rows must match", records, actualRecords);
}
+
+ @Test
+ public void testRemoveOrphanFilesProcedureWithPrefixMode() throws NoSuchTableException, ParseException, IOException {
+ if (catalogName.equals("testhadoop")) {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+ } else {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg LOCATION '%s'", tableName,
+ temp.newFolder().toURI().toString());
+ }
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ String location = table.location();
+ Path originalPath = new Path(location);
+
+ URI uri = originalPath.toUri();
+ Path newParentPath = new Path("file1", uri.getAuthority(), uri.getPath());
+
+ DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath, "path/to/data-a.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ DataFile dataFile2 = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath, "path/to/data-b.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ table.newFastAppend()
+ .appendFile(dataFile1)
+ .appendFile(dataFile2)
+ .commit();
+
+
+ Timestamp lastModifiedTimestamp = new Timestamp(10000);
+
+ List<FilePathLastModifiedRecord> allFiles = Lists.newArrayList(
+ new FilePathLastModifiedRecord(
+ new Path(originalPath, "path/to/data-a.parquet").toString(),
+ lastModifiedTimestamp),
+ new FilePathLastModifiedRecord(
+ new Path(originalPath, "path/to/data-b.parquet").toString(),
+ lastModifiedTimestamp),
+ new FilePathLastModifiedRecord(
+ ReachableFileUtil.versionHintLocation(table),
+ lastModifiedTimestamp));
+
+ for (String file : ReachableFileUtil.metadataFileLocations(table, true)) {
+ allFiles.add(new FilePathLastModifiedRecord(file, lastModifiedTimestamp));
+ }
+
+ for (ManifestFile manifest : TestHelpers.dataManifests(table)) {
+ allFiles.add(new FilePathLastModifiedRecord(manifest.path(), lastModifiedTimestamp));
+ }
+
+ Dataset<Row> compareToFileList = spark.createDataFrame(allFiles,
+ FilePathLastModifiedRecord.class).withColumnRenamed("filePath", "file_path")
+ .withColumnRenamed("lastModified", "last_modified");
+ String fileListViewName = "files_view";
+ compareToFileList.createOrReplaceTempView(fileListViewName);
+ List<Object[]> orphanFiles = sql(
+ "CALL %s.system.remove_orphan_files(" +
+ "table => '%s'," +
+ "equal_schemes => map('file1', 'file')," +
+ "file_list_view => '%s')",
+ catalogName, tableIdent, fileListViewName);
+ Assert.assertEquals(0, orphanFiles.size());
+
+ // Test with no equal schemes
+ AssertHelpers.assertThrows("Should complain about removing orphan files",
+ ValidationException.class, "Conflicting authorities/schemes: [(file1, file)]",
+ () -> sql(
+ "CALL %s.system.remove_orphan_files(" +
+ "table => '%s'," +
+ "file_list_view => '%s')",
+ catalogName, tableIdent, fileListViewName));
+
+ // Drop table in afterEach has purge and fails due to invalid scheme "file1" used in this test
+ // Dropping the table here
+ sql("DROP TABLE %s", tableName);
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 72b4f8f43a..9c0cebc57a 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -19,13 +19,15 @@
package org.apache.iceberg.spark.actions;
-import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.net.URI;
import java.sql.Timestamp;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -47,26 +49,32 @@ import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Tuple2;
import static org.apache.iceberg.TableProperties.GC_ENABLED;
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
@@ -94,14 +102,8 @@ public class DeleteOrphanFilesSparkAction
extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements DeleteOrphanFiles {
private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
+ private static final Splitter COMMA = Splitter.on(",");
+ private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = ImmutableMap.of("s3n,s3a", "s3");
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
@@ -113,6 +115,9 @@ public class DeleteOrphanFilesSparkAction
}
};
+ private Map<String, String> equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT);
+ private Map<String, String> equalAuthorities = Collections.emptyMap();
+ private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
private String location = null;
private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
private Dataset<Row> compareToFileList;
@@ -143,6 +148,27 @@ public class DeleteOrphanFilesSparkAction
return this;
}
+ @Override
+ public DeleteOrphanFilesSparkAction prefixMismatchMode(PrefixMismatchMode newPrefixMismatchMode) {
+ this.prefixMismatchMode = newPrefixMismatchMode;
+ return this;
+ }
+
+ @Override
+ public DeleteOrphanFilesSparkAction equalSchemes(Map<String, String> newEqualSchemes) {
+ this.equalSchemes = Maps.newHashMap();
+ equalSchemes.putAll(flattenMap(EQUAL_SCHEMES_DEFAULT));
+ equalSchemes.putAll(flattenMap(newEqualSchemes));
+ return this;
+ }
+
+ @Override
+ public DeleteOrphanFilesSparkAction equalAuthorities(Map<String, String> newEqualAuthorities) {
+ this.equalAuthorities = Maps.newHashMap();
+ equalAuthorities.putAll(flattenMap(newEqualAuthorities));
+ return this;
+ }
+
@Override
public DeleteOrphanFilesSparkAction location(String newLocation) {
this.location = newLocation;
@@ -213,14 +239,8 @@ public class DeleteOrphanFilesSparkAction
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
- Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
- Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
- Column nameEqual = actualFileName.equalTo(validFileName);
- Column actualContains = actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
- Column joinCond = nameEqual.and(actualContains);
- List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
- .as(Encoders.STRING())
- .collectAsList();
+ List<String> orphanFiles = findOrphanFiles(spark(), actualFileDF, validFileDF,
+ equalSchemes, equalAuthorities, prefixMismatchMode);
Tasks.foreach(orphanFiles)
.noRetry()
@@ -326,6 +346,99 @@ public class DeleteOrphanFilesSparkAction
};
}
+ @VisibleForTesting
+ static List<String> findOrphanFiles(
+ SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row> validFileDF,
+ Map<String, String> equalSchemes, Map<String, String> equalAuthorities,
+ PrefixMismatchMode prefixMismatchMode) {
+ Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+ Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+
+ SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+ spark.sparkContext().register(conflicts);
+
+ Column joinCond = actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+ List<String> orphanFiles = actualFileMetadataDS.joinWith(validFileMetadataDS, joinCond, "leftouter")
+ .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
+ .collectAsList();
+
+ if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
+ throw new ValidationException("Unable to determine whether certain files are orphan. " +
+ "Metadata references files that match listed/provided files except for authority/scheme. " +
+ "Please, inspect the conflicting authorities/schemes and provide which of them are equal " +
+ "by further configuring the action via equalSchemes() and equalAuthorities() methods. " +
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting " +
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting " +
+ "authorities/schemes are different. It will be impossible to recover deleted files. " +
+ "Conflicting authorities/schemes: %s.", conflicts.value());
+ }
+
+ return orphanFiles;
+ }
+
+ private static Map<String, String> flattenMap(Map<String, String> map) {
+ Map<String, String> flattenedMap = Maps.newHashMap();
+ if (map != null) {
+ for (String key : map.keySet()) {
+ String value = map.get(key);
+ for (String splitKey : COMMA.split(key)) {
+ flattenedMap.put(splitKey.trim(), value.trim());
+ }
+ }
+ }
+ return flattenedMap;
+ }
+
+ private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>, String> findOrphanFiles(
+ PrefixMismatchMode mode,
+ SetAccumulator<Pair<String, String>> conflicts) {
+ return rows -> {
+ Iterator<String> transformed = Iterators.transform(rows, row -> {
+ FileMetadata actual = row._1;
+ FileMetadata valid = row._2;
+
+ if (valid == null) {
+ return actual.location;
+ }
+
+ boolean schemeMatch = Strings.isNullOrEmpty(valid.scheme) ||
+ valid.scheme.equalsIgnoreCase(actual.scheme);
+ boolean authorityMatch = Strings.isNullOrEmpty(valid.authority) ||
+ valid.authority.equalsIgnoreCase(actual.authority);
+
+ if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
+ return actual.location;
+ } else {
+ if (!schemeMatch) {
+ conflicts.add(Pair.of(valid.scheme, actual.scheme));
+ }
+ if (!authorityMatch) {
+ conflicts.add(Pair.of(valid.authority, actual.authority));
+ }
+ }
+
+ return null;
+ });
+ return Iterators.filter(transformed, Objects::nonNull);
+ };
+ }
+
+ private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(
+ Map<String, String> equalSchemesMap, Map<String, String> equalAuthoritiesMap) {
+ return rows -> Iterators.transform(rows, row -> {
+ String location = row.getString(0);
+ URI uri = new Path(location).toUri();
+ String scheme = equalSchemesMap.getOrDefault(uri.getScheme(), uri.getScheme());
+ String authority = equalAuthoritiesMap.getOrDefault(uri.getAuthority(), uri.getAuthority());
+ return new FileMetadata(scheme, authority, uri.getPath(), location);
+ });
+ }
+
/**
* A {@link PathFilter} that filters out hidden path, but does not filter out paths that would be marked
* as hidden by {@link HiddenPathFilter} due to a partition field that starts with one of the characters that
@@ -361,4 +474,53 @@ public class DeleteOrphanFilesSparkAction
return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
}
}
+
+ public static class FileMetadata implements Serializable {
+ private String scheme;
+ private String authority;
+ private String path;
+ private String location;
+
+ public FileMetadata(String scheme, String authority, String path, String location) {
+ this.scheme = scheme;
+ this.authority = authority;
+ this.path = path;
+ this.location = location;
+ }
+
+ public FileMetadata() {
+ }
+
+ public void setScheme(String scheme) {
+ this.scheme = scheme;
+ }
+
+ public void setAuthority(String authority) {
+ this.authority = authority;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public void setLocation(String location) {
+ this.location = location;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ public String getAuthority() {
+ return authority;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java
new file mode 100644
index 0000000000..f16936949f
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.util.AccumulatorV2;
+
+public class SetAccumulator<T> extends AccumulatorV2<T, java.util.Set<T>> {
+
+ private final Set<T> set = Collections.synchronizedSet(Sets.newHashSet());
+
+ @Override
+ public boolean isZero() {
+ return set.isEmpty();
+ }
+
+ @Override
+ public AccumulatorV2<T, Set<T>> copy() {
+ SetAccumulator<T> newAccumulator = new SetAccumulator<>();
+ newAccumulator.set.addAll(set);
+ return newAccumulator;
+ }
+
+ @Override
+ public void reset() {
+ set.clear();
+ }
+
+ @Override
+ public void add(T v) {
+ set.add(v);
+ }
+
+ @Override
+ public void merge(AccumulatorV2<T, Set<T>> other) {
+ set.addAll(other.value());
+ }
+
+ @Override
+ public Set<T> value() {
+ return set;
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index a64a28b631..92b0ba30b5 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -19,11 +19,14 @@
package org.apache.iceberg.spark.procedures;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
@@ -37,6 +40,7 @@ import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
+import scala.runtime.BoxedUnit;
/**
* A procedure that removes orphan files in a table.
@@ -51,7 +55,10 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
ProcedureParameter.optional("location", DataTypes.StringType),
ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType),
- ProcedureParameter.optional("file_list_view", DataTypes.StringType)
+ ProcedureParameter.optional("file_list_view", DataTypes.StringType),
+ ProcedureParameter.optional("equal_schemes", STRING_MAP),
+ ProcedureParameter.optional("equal_authorities", STRING_MAP),
+ ProcedureParameter.optional("prefix_mismatch_mode", DataTypes.StringType),
};
private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
@@ -94,6 +101,27 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes);
+ Map<String, String> equalSchemes = Maps.newHashMap();
+ if (!args.isNullAt(6)) {
+ args.getMap(6).foreach(DataTypes.StringType, DataTypes.StringType,
+ (k, v) -> {
+ equalSchemes.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
+ });
+ }
+
+ Map<String, String> equalAuthorities = Maps.newHashMap();
+ if (!args.isNullAt(7)) {
+ args.getMap(7).foreach(DataTypes.StringType, DataTypes.StringType,
+ (k, v) -> {
+ equalSchemes.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
+ });
+ }
+
+ PrefixMismatchMode prefixMismatchMode = args.isNullAt(8) ? null :
+ PrefixMismatchMode.fromString(args.getString(8));
+
return withIcebergTable(tableIdent, table -> {
DeleteOrphanFilesSparkAction action = actions().deleteOrphanFiles(table);
@@ -121,6 +149,13 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
action.compareToFileList(spark().table(fileListView));
}
+ action.equalSchemes(equalSchemes);
+ action.equalAuthorities(equalAuthorities);
+
+ if (prefixMismatchMode != null) {
+ action.prefixMismatchMode(prefixMismatchMode);
+ }
+
DeleteOrphanFiles.Result result = action.execute();
return toOutputRows(result);
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 1d9e479397..e7240edda6 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hadoop.HiddenPathFilter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -955,4 +956,114 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
}
return current;
}
+
+ @Test
+ public void testPathsWithExtraSlashes() {
+ List<String> validFiles = Lists.newArrayList("file:///dir1/dir2/file1");
+ List<String> actualFiles = Lists.newArrayList("file:///dir1/////dir2///file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithValidFileHavingNoAuthority() {
+ List<String> validFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+ List<String> actualFiles = Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithActualFileHavingNoAuthority() {
+ List<String> validFiles = Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+ List<String> actualFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithEqualSchemes() {
+ List<String> validFiles = Lists.newArrayList("scheme1://bucket1/dir1/dir2/file1");
+ List<String> actualFiles = Lists.newArrayList("scheme2://bucket1/dir1/dir2/file1");
+ AssertHelpers.assertThrows("Test remove orphan files with equal schemes",
+ ValidationException.class,
+ "Conflicting authorities/schemes: [(scheme1, scheme2)]",
+ () -> executeTest(validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+
+ Map<String, String> equalSchemes = Maps.newHashMap();
+ equalSchemes.put("scheme1", "scheme");
+ equalSchemes.put("scheme2", "scheme");
+ executeTest(validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ equalSchemes,
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR);
+ }
+
+ @Test
+ public void testPathsWithEqualAuthorities() {
+ List<String> validFiles = Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
+ List<String> actualFiles = Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
+ AssertHelpers.assertThrows("Test remove orphan files with equal authorities",
+ ValidationException.class,
+ "Conflicting authorities/schemes: [(servicename1, servicename2)]",
+ () -> executeTest(validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+
+ Map<String, String> equalAuthorities = Maps.newHashMap();
+ equalAuthorities.put("servicename1", "servicename");
+ equalAuthorities.put("servicename2", "servicename");
+ executeTest(validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ equalAuthorities,
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR);
+ }
+
+ @Test
+ public void testRemoveOrphanFileActionWithDeleteMode() {
+ List<String> validFiles = Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
+ List<String> actualFiles = Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
+
+ executeTest(validFiles,
+ actualFiles,
+ Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1"),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.DELETE);
+ }
+
+ private void executeTest(List<String> validFiles,
+ List<String> actualFiles,
+ List<String> expectedOrphanFiles) {
+ executeTest(validFiles, actualFiles, expectedOrphanFiles, ImmutableMap.of(), ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.IGNORE);
+ }
+
+ private void executeTest(List<String> validFiles,
+ List<String> actualFiles,
+ List<String> expectedOrphanFiles,
+ Map<String, String> equalSchemes,
+ Map<String, String> equalAuthorities,
+ DeleteOrphanFiles.PrefixMismatchMode mode) {
+ Dataset<Row> validFilesDF = spark.createDataset(validFiles, Encoders.STRING()).toDF();
+ Dataset<Row> actualFilesDF = spark.createDataset(actualFiles, Encoders.STRING()).toDF();
+
+ List<String> orphanFiles = DeleteOrphanFilesSparkAction.findOrphanFiles(
+ spark,
+ actualFilesDF,
+ validFilesDF,
+ equalSchemes,
+ equalAuthorities,
+ mode);
+ Assert.assertEquals(expectedOrphanFiles, orphanFiles);
+ }
}