You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/08/18 16:22:45 UTC
[iceberg] branch master updated: Spark 3.3: Reduce serialization in DeleteOrphanFilesSparkAction (#5495)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c73c7729b2 Spark 3.3: Reduce serialization in DeleteOrphanFilesSparkAction (#5495)
c73c7729b2 is described below
commit c73c7729b20078d8a3c59be0f6c1c44635cbe090
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu Aug 18 09:22:39 2022 -0700
Spark 3.3: Reduce serialization in DeleteOrphanFilesSparkAction (#5495)
---
.../iceberg/spark/actions/BaseSparkAction.java | 6 +-
.../actions/DeleteOrphanFilesSparkAction.java | 377 +++++++++++++--------
.../spark/actions/ExpireSnapshotsSparkAction.java | 4 +-
.../spark/actions/TestRemoveOrphanFilesAction.java | 10 +-
4 files changed, 240 insertions(+), 157 deletions(-)
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 728d21df06..a1dfa6009e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -47,6 +47,8 @@ import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -72,9 +74,11 @@ abstract class BaseSparkAction<ThisT> {
protected static final String OTHERS = "Others";
protected static final String FILE_PATH = "file_path";
- protected static final String FILE_TYPE = "file_type";
protected static final String LAST_MODIFIED = "last_modified";
+ protected static final Splitter COMMA_SPLITTER = Splitter.on(",");
+ protected static final Joiner COMMA_JOINER = Joiner.on(',');
+
private static final Logger LOG = LoggerFactory.getLogger(BaseSparkAction.class);
private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
private static final int DELETE_NUM_RETRIES = 3;
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 527783b9c8..1abd2107ed 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -23,6 +23,7 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
import java.io.IOException;
import java.io.Serializable;
+import java.io.UncheckedIOException;
import java.net.URI;
import java.sql.Timestamp;
import java.util.Collections;
@@ -45,13 +46,10 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
@@ -67,6 +65,7 @@ import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -103,11 +102,14 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
implements DeleteOrphanFiles {
private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
- private static final Splitter COMMA = Splitter.on(",");
private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = ImmutableMap.of("s3n,s3a", "s3");
+ private static final int MAX_DRIVER_LISTING_DEPTH = 3;
+ private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10;
+ private static final int MAX_EXECUTOR_LISTING_DEPTH = 2000;
+ private static final int MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS = Integer.MAX_VALUE;
private final SerializableConfiguration hadoopConf;
- private final int partitionDiscoveryParallelism;
+ private final int listingParallelism;
private final Table table;
private final Consumer<String> defaultDelete =
new Consumer<String>() {
@@ -130,8 +132,7 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
super(spark);
this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
- this.partitionDiscoveryParallelism =
- spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
+ this.listingParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
this.table = table;
this.location = table.location();
@@ -211,14 +212,15 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
return this;
}
- private Dataset<Row> filteredCompareToFileList() {
+ private Dataset<String> filteredCompareToFileList() {
Dataset<Row> files = compareToFileList;
if (location != null) {
files = files.filter(files.col(FILE_PATH).startsWith(location));
}
return files
.filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp)))
- .select(files.col(FILE_PATH));
+ .select(files.col(FILE_PATH))
+ .as(Encoders.STRING());
}
@Override
@@ -233,18 +235,16 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
if (location != null) {
options.add("location=" + location);
}
- return String.format(
- "Deleting orphan files (%s) from %s", Joiner.on(',').join(options), table.name());
+ String optionsAsString = COMMA_JOINER.join(options);
+ return String.format("Deleting orphan files (%s) from %s", optionsAsString, table.name());
}
private DeleteOrphanFiles.Result doExecute() {
- Dataset<Row> validFileDF = buildValidFileDF();
- Dataset<Row> actualFileDF =
- compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
+ Dataset<FileURI> actualFileIdentDS = actualFileIdentDS();
+ Dataset<FileURI> validFileIdentDS = validFileIdentDS();
List<String> orphanFiles =
- findOrphanFiles(
- spark(), actualFileDF, validFileDF, equalSchemes, equalAuthorities, prefixMismatchMode);
+ findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode);
Tasks.foreach(orphanFiles)
.noRetry()
@@ -256,41 +256,64 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
return new BaseDeleteOrphanFilesActionResult(orphanFiles);
}
- private Dataset<Row> buildValidFileDF() {
- return contentFileDS(table)
- .union(manifestDS(table))
- .union(manifestListDS(table))
- .union(otherMetadataFileDS(table))
- .toDF(FILE_PATH, FILE_TYPE)
- .select(FILE_PATH);
+ private Dataset<FileURI> validFileIdentDS() {
+ // transform before union to avoid extra serialization/deserialization
+ FileInfoToFileURI toFileURI = new FileInfoToFileURI(equalSchemes, equalAuthorities);
+
+ Dataset<FileURI> contentFileIdentDS = toFileURI.apply(contentFileDS(table));
+ Dataset<FileURI> manifestFileIdentDS = toFileURI.apply(manifestDS(table));
+ Dataset<FileURI> manifestListIdentDS = toFileURI.apply(manifestListDS(table));
+ Dataset<FileURI> otherMetadataFileIdentDS = toFileURI.apply(otherMetadataFileDS(table));
+
+ return contentFileIdentDS
+ .union(manifestFileIdentDS)
+ .union(manifestListIdentDS)
+ .union(otherMetadataFileIdentDS);
+ }
+
+ private Dataset<FileURI> actualFileIdentDS() {
+ StringToFileURI toFileURI = new StringToFileURI(equalSchemes, equalAuthorities);
+ if (compareToFileList == null) {
+ return toFileURI.apply(listedFileDS());
+ } else {
+ return toFileURI.apply(filteredCompareToFileList());
+ }
}
- private Dataset<Row> buildActualFileDF() {
+ private Dataset<String> listedFileDS() {
List<String> subDirs = Lists.newArrayList();
List<String> matchingFiles = Lists.newArrayList();
Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
- // list at most 3 levels and only dirs that have less than 10 direct sub dirs on the driver
+ // list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
+ // less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
listDirRecursively(
- location, predicate, hadoopConf.value(), 3, 10, subDirs, pathFilter, matchingFiles);
+ location,
+ predicate,
+ hadoopConf.value(),
+ MAX_DRIVER_LISTING_DEPTH,
+ MAX_DRIVER_LISTING_DIRECT_SUB_DIRS,
+ subDirs,
+ pathFilter,
+ matchingFiles);
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
if (subDirs.isEmpty()) {
- return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
+ return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());
}
- int parallelism = Math.min(subDirs.size(), partitionDiscoveryParallelism);
+ int parallelism = Math.min(subDirs.size(), listingParallelism);
JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);
Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
- JavaRDD<String> matchingLeafFileRDD =
- subDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp, pathFilter));
+ ListDirsRecursively listDirs = new ListDirsRecursively(conf, olderThanTimestamp, pathFilter);
+ JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirs);
JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
- return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
+ return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING());
}
private static void listDirRecursively(
@@ -341,68 +364,26 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
matchingFiles);
}
} catch (IOException e) {
- throw new RuntimeIOException(e);
+ throw new UncheckedIOException(e);
}
}
- private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
- Broadcast<SerializableConfiguration> conf, long olderThanTimestamp, PathFilter pathFilter) {
-
- return dirs -> {
- List<String> subDirs = Lists.newArrayList();
- List<String> files = Lists.newArrayList();
-
- Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
-
- int maxDepth = 2000;
- int maxDirectSubDirs = Integer.MAX_VALUE;
-
- dirs.forEachRemaining(
- dir -> {
- listDirRecursively(
- dir,
- predicate,
- conf.value().value(),
- maxDepth,
- maxDirectSubDirs,
- subDirs,
- pathFilter,
- files);
- });
-
- if (!subDirs.isEmpty()) {
- throw new RuntimeException(
- "Could not list subdirectories, reached maximum subdirectory depth: " + maxDepth);
- }
-
- return files.iterator();
- };
- }
-
@VisibleForTesting
static List<String> findOrphanFiles(
SparkSession spark,
- Dataset<Row> actualFileDF,
- Dataset<Row> validFileDF,
- Map<String, String> equalSchemes,
- Map<String, String> equalAuthorities,
+ Dataset<FileURI> actualFileIdentDS,
+ Dataset<FileURI> validFileIdentDS,
PrefixMismatchMode prefixMismatchMode) {
- Dataset<FileMetadata> actualFileMetadataDS =
- actualFileDF.mapPartitions(
- toFileMetadata(equalSchemes, equalAuthorities), Encoders.bean(FileMetadata.class));
- Dataset<FileMetadata> validFileMetadataDS =
- validFileDF.mapPartitions(
- toFileMetadata(equalSchemes, equalAuthorities), Encoders.bean(FileMetadata.class));
SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
spark.sparkContext().register(conflicts);
- Column joinCond = actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+ Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));
List<String> orphanFiles =
- actualFileMetadataDS
- .joinWith(validFileMetadataDS, joinCond, "leftouter")
- .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
+ actualFileIdentDS
+ .joinWith(validFileIdentDS, joinCond, "leftouter")
+ .mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
.collectAsList();
if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
@@ -426,7 +407,7 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
if (map != null) {
for (String key : map.keySet()) {
String value = map.get(key);
- for (String splitKey : COMMA.split(key)) {
+ for (String splitKey : COMMA_SPLITTER.split(key)) {
flattenedMap.put(splitKey.trim(), value.trim());
}
}
@@ -434,57 +415,150 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
return flattenedMap;
}
- private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>, String> findOrphanFiles(
- PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) {
- return rows -> {
- Iterator<String> transformed =
- Iterators.transform(
- rows,
- row -> {
- FileMetadata actual = row._1;
- FileMetadata valid = row._2;
-
- if (valid == null) {
- return actual.location;
- }
-
- boolean schemeMatch =
- Strings.isNullOrEmpty(valid.scheme)
- || valid.scheme.equalsIgnoreCase(actual.scheme);
- boolean authorityMatch =
- Strings.isNullOrEmpty(valid.authority)
- || valid.authority.equalsIgnoreCase(actual.authority);
-
- if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
- return actual.location;
- } else {
- if (!schemeMatch) {
- conflicts.add(Pair.of(valid.scheme, actual.scheme));
- }
- if (!authorityMatch) {
- conflicts.add(Pair.of(valid.authority, actual.authority));
- }
- }
-
- return null;
- });
- return Iterators.filter(transformed, Objects::nonNull);
- };
+ private static class ListDirsRecursively implements FlatMapFunction<Iterator<String>, String> {
+
+ private final Broadcast<SerializableConfiguration> hadoopConf;
+ private final long olderThanTimestamp;
+ private final PathFilter pathFilter;
+
+ ListDirsRecursively(
+ Broadcast<SerializableConfiguration> hadoopConf,
+ long olderThanTimestamp,
+ PathFilter pathFilter) {
+
+ this.hadoopConf = hadoopConf;
+ this.olderThanTimestamp = olderThanTimestamp;
+ this.pathFilter = pathFilter;
+ }
+
+ @Override
+ public Iterator<String> call(Iterator<String> dirs) throws Exception {
+ List<String> subDirs = Lists.newArrayList();
+ List<String> files = Lists.newArrayList();
+
+ Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
+
+ while (dirs.hasNext()) {
+ listDirRecursively(
+ dirs.next(),
+ predicate,
+ hadoopConf.value().value(),
+ MAX_EXECUTOR_LISTING_DEPTH,
+ MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS,
+ subDirs,
+ pathFilter,
+ files);
+ }
+
+ if (!subDirs.isEmpty()) {
+ throw new RuntimeException(
+ "Could not list sub directories, reached maximum depth: " + MAX_EXECUTOR_LISTING_DEPTH);
+ }
+
+ return files.iterator();
+ }
+ }
+
+ private static class FindOrphanFiles
+ implements MapPartitionsFunction<Tuple2<FileURI, FileURI>, String> {
+
+ private final PrefixMismatchMode mode;
+ private final SetAccumulator<Pair<String, String>> conflicts;
+
+ FindOrphanFiles(PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) {
+ this.mode = mode;
+ this.conflicts = conflicts;
+ }
+
+ @Override
+ public Iterator<String> call(Iterator<Tuple2<FileURI, FileURI>> rows) throws Exception {
+ Iterator<String> orphanFiles = Iterators.transform(rows, this::toOrphanFile);
+ return Iterators.filter(orphanFiles, Objects::nonNull);
+ }
+
+ private String toOrphanFile(Tuple2<FileURI, FileURI> row) {
+ FileURI actual = row._1;
+ FileURI valid = row._2;
+
+ if (valid == null) {
+ return actual.uriAsString;
+ }
+
+ boolean schemeMatch = uriComponentMatch(valid.scheme, actual.scheme);
+ boolean authorityMatch = uriComponentMatch(valid.authority, actual.authority);
+
+ if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
+ return actual.uriAsString;
+ } else {
+ if (!schemeMatch) {
+ conflicts.add(Pair.of(valid.scheme, actual.scheme));
+ }
+
+ if (!authorityMatch) {
+ conflicts.add(Pair.of(valid.authority, actual.authority));
+ }
+
+ return null;
+ }
+ }
+
+ private boolean uriComponentMatch(String valid, String actual) {
+ return Strings.isNullOrEmpty(valid) || valid.equalsIgnoreCase(actual);
+ }
+ }
+
+ @VisibleForTesting
+ static class StringToFileURI extends ToFileURI<String> {
+ StringToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
+ super(equalSchemes, equalAuthorities);
+ }
+
+ @Override
+ protected String uriAsString(String input) {
+ return input;
+ }
+ }
+
+ @VisibleForTesting
+ static class FileInfoToFileURI extends ToFileURI<FileInfo> {
+ FileInfoToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
+ super(equalSchemes, equalAuthorities);
+ }
+
+ @Override
+ protected String uriAsString(FileInfo fileInfo) {
+ return fileInfo.getPath();
+ }
}
- private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(
- Map<String, String> equalSchemesMap, Map<String, String> equalAuthoritiesMap) {
- return rows ->
- Iterators.transform(
- rows,
- row -> {
- String location = row.getString(0);
- URI uri = new Path(location).toUri();
- String scheme = equalSchemesMap.getOrDefault(uri.getScheme(), uri.getScheme());
- String authority =
- equalAuthoritiesMap.getOrDefault(uri.getAuthority(), uri.getAuthority());
- return new FileMetadata(scheme, authority, uri.getPath(), location);
- });
+ private abstract static class ToFileURI<I> implements MapPartitionsFunction<I, FileURI> {
+
+ private final Map<String, String> equalSchemes;
+ private final Map<String, String> equalAuthorities;
+
+ ToFileURI(Map<String, String> equalSchemes, Map<String, String> equalAuthorities) {
+ this.equalSchemes = equalSchemes;
+ this.equalAuthorities = equalAuthorities;
+ }
+
+ protected abstract String uriAsString(I input);
+
+ Dataset<FileURI> apply(Dataset<I> ds) {
+ return ds.mapPartitions(this, FileURI.ENCODER);
+ }
+
+ @Override
+ public Iterator<FileURI> call(Iterator<I> rows) throws Exception {
+ return Iterators.transform(rows, this::toFileURI);
+ }
+
+ private FileURI toFileURI(I input) {
+ String uriAsString = uriAsString(input);
+ URI uri = new Path(uriAsString).toUri();
+ String scheme = equalSchemes.getOrDefault(uri.getScheme(), uri.getScheme());
+ String authority = equalAuthorities.getOrDefault(uri.getAuthority(), uri.getAuthority());
+ return new FileURI(scheme, authority, uri.getPath(), uriAsString);
+ }
}
/**
@@ -503,9 +577,11 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
@Override
public boolean accept(Path path) {
- boolean isHiddenPartitionPath =
- hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
- return isHiddenPartitionPath || HiddenPathFilter.get().accept(path);
+ return isHiddenPartitionPath(path) || HiddenPathFilter.get().accept(path);
+ }
+
+ private boolean isHiddenPartitionPath(Path path) {
+ return hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
}
static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
@@ -517,33 +593,34 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
specs.values().stream()
.map(PartitionSpec::fields)
.flatMap(List::stream)
- .filter(
- partitionField ->
- partitionField.name().startsWith("_")
- || partitionField.name().startsWith("."))
- .map(partitionField -> partitionField.name() + "=")
+ .filter(field -> field.name().startsWith("_") || field.name().startsWith("."))
+ .map(field -> field.name() + "=")
.collect(Collectors.toSet());
- return partitionNames.isEmpty()
- ? HiddenPathFilter.get()
- : new PartitionAwareHiddenPathFilter(partitionNames);
+ if (partitionNames.isEmpty()) {
+ return HiddenPathFilter.get();
+ } else {
+ return new PartitionAwareHiddenPathFilter(partitionNames);
+ }
}
}
- public static class FileMetadata implements Serializable {
+ public static class FileURI {
+ public static final Encoder<FileURI> ENCODER = Encoders.bean(FileURI.class);
+
private String scheme;
private String authority;
private String path;
- private String location;
+ private String uriAsString;
- public FileMetadata(String scheme, String authority, String path, String location) {
+ public FileURI(String scheme, String authority, String path, String uriAsString) {
this.scheme = scheme;
this.authority = authority;
this.path = path;
- this.location = location;
+ this.uriAsString = uriAsString;
}
- public FileMetadata() {}
+ public FileURI() {}
public void setScheme(String scheme) {
this.scheme = scheme;
@@ -557,8 +634,8 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
this.path = path;
}
- public void setLocation(String location) {
- this.location = location;
+ public void setUriAsString(String uriAsString) {
+ this.uriAsString = uriAsString;
}
public String getScheme() {
@@ -573,8 +650,8 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
return path;
}
- public String getLocation() {
- return location;
+ public String getUriAsString() {
+ return uriAsString;
}
}
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 46e6455190..b47f367dde 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -33,7 +33,6 @@ import org.apache.iceberg.TableOperations;
import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -222,8 +221,7 @@ public class ExpireSnapshotsSparkAction extends BaseSparkAction<ExpireSnapshotsS
}
}
- return String.format(
- "Expiring snapshots (%s) in %s", Joiner.on(',').join(options), table.name());
+ return String.format("Expiring snapshots (%s) in %s", COMMA_JOINER.join(options), table.name());
}
private ExpireSnapshots.Result doExecute() {
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 20f995731e..10a33e836d 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.StringToFileURI;
import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
@@ -986,12 +987,15 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
Map<String, String> equalSchemes,
Map<String, String> equalAuthorities,
DeleteOrphanFiles.PrefixMismatchMode mode) {
- Dataset<Row> validFilesDF = spark.createDataset(validFiles, Encoders.STRING()).toDF();
- Dataset<Row> actualFilesDF = spark.createDataset(actualFiles, Encoders.STRING()).toDF();
+
+ StringToFileURI toFileUri = new StringToFileURI(equalSchemes, equalAuthorities);
+
+ Dataset<String> validFileDS = spark.createDataset(validFiles, Encoders.STRING());
+ Dataset<String> actualFileDS = spark.createDataset(actualFiles, Encoders.STRING());
List<String> orphanFiles =
DeleteOrphanFilesSparkAction.findOrphanFiles(
- spark, actualFilesDF, validFilesDF, equalSchemes, equalAuthorities, mode);
+ spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode);
Assert.assertEquals(expectedOrphanFiles, orphanFiles);
}
}