You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/07/12 23:00:54 UTC
[iceberg] branch master updated: Spark 3.2: Expose action classes in SparkActions (#5261)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6d2edd6284 Spark 3.2: Expose action classes in SparkActions (#5261)
6d2edd6284 is described below
commit 6d2edd6284ebc5301dbe45376a31ca8316852a77
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Jul 12 16:00:50 2022 -0700
Spark 3.2: Expose action classes in SparkActions (#5261)
---
.../actions/BaseDeleteOrphanFilesSparkAction.java | 318 +-------------
.../BaseDeleteReachableFilesSparkAction.java | 158 +------
.../actions/BaseExpireSnapshotsSparkAction.java | 228 +---------
.../spark/actions/BaseMigrateTableSparkAction.java | 189 +--------
.../actions/BaseRewriteDataFilesSparkAction.java | 459 +--------------------
.../actions/BaseRewriteManifestsSparkAction.java | 348 +---------------
.../actions/BaseSnapshotTableSparkAction.java | 179 +-------
.../actions/BaseSnapshotUpdateSparkAction.java | 5 +-
.../iceberg/spark/actions/BaseSparkAction.java | 5 +-
.../actions/BaseTableCreationSparkAction.java | 2 +-
...tion.java => DeleteOrphanFilesSparkAction.java} | 20 +-
...n.java => DeleteReachableFilesSparkAction.java} | 16 +-
...Action.java => ExpireSnapshotsSparkAction.java} | 20 +-
...arkAction.java => MigrateTableSparkAction.java} | 14 +-
...ction.java => RewriteDataFilesSparkAction.java} | 21 +-
...ction.java => RewriteManifestsSparkAction.java} | 20 +-
...rkAction.java => SnapshotTableSparkAction.java} | 18 +-
.../apache/iceberg/spark/actions/SparkActions.java | 21 +-
.../spark/procedures/ExpireSnapshotsProcedure.java | 4 +-
.../procedures/RemoveOrphanFilesProcedure.java | 6 +-
.../procedures/RewriteManifestsProcedure.java | 5 +-
.../spark/actions/TestExpireSnapshotsAction.java | 2 +-
.../spark/actions/TestRemoveOrphanFilesAction.java | 8 +-
.../spark/actions/TestRewriteDataFilesAction.java | 2 +-
24 files changed, 148 insertions(+), 1920 deletions(-)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
index dc58a05d4d..7cbcf164d4 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
@@ -19,346 +19,52 @@
package org.apache.iceberg.spark.actions;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
-import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HiddenPathFilter;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
/**
- * An action that removes orphan metadata, data and delete files by listing a given location and comparing
- * the actual files in that location with content and metadata files referenced by all valid snapshots.
- * The location must be accessible for listing via the Hadoop {@link FileSystem}.
- * <p>
- * By default, this action cleans up the table location returned by {@link Table#location()} and
- * removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified
- * by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}.
- * For example, someone might point this action to the data folder to clean up only orphan data files.
- * <p>
- * Configure an alternative delete method using {@link #deleteWith(Consumer)}.
- * <p>
- * For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument. This
- * skips the directory listing - any files in the dataset provided which are not found in table metadata will
- * be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above.
- * <p>
- * <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
- * the state of the table if another operation is writing at the same time.
+ * An action to delete orphan files.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ * use {@link SparkActions} and {@link DeleteOrphanFilesSparkAction} instead.
*/
-public class BaseDeleteOrphanFilesSparkAction
- extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
-
- private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
- private final SerializableConfiguration hadoopConf;
- private final int partitionDiscoveryParallelism;
- private final Table table;
- private final Consumer<String> defaultDelete = new Consumer<String>() {
- @Override
- public void accept(String file) {
- table.io().deleteFile(file);
- }
- };
-
- private String location = null;
- private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
- private Dataset<Row> compareToFileList;
- private Consumer<String> deleteFunc = defaultDelete;
- private ExecutorService deleteExecutorService = null;
+@Deprecated
+public class BaseDeleteOrphanFilesSparkAction extends DeleteOrphanFilesSparkAction {
public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
- super(spark);
-
- this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
- this.partitionDiscoveryParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
- this.table = table;
- this.location = table.location();
-
- ValidationException.check(
- PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
- "Cannot delete orphan files: GC is disabled (deleting files may corrupt other tables)");
- }
-
- @Override
- protected DeleteOrphanFiles self() {
- return this;
+ super(spark, table);
}
@Override
public BaseDeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
- this.deleteExecutorService = executorService;
+ super.executeDeleteWith(executorService);
return this;
}
@Override
public BaseDeleteOrphanFilesSparkAction location(String newLocation) {
- this.location = newLocation;
+ super.location(newLocation);
return this;
}
@Override
public BaseDeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
- this.olderThanTimestamp = newOlderThanTimestamp;
+ super.olderThan(newOlderThanTimestamp);
return this;
}
@Override
public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
- this.deleteFunc = newDeleteFunc;
+ super.deleteWith(newDeleteFunc);
return this;
}
public BaseDeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
- StructType schema = files.schema();
-
- StructField filePathField = schema.apply(FILE_PATH);
- Preconditions.checkArgument(
- filePathField.dataType() == DataTypes.StringType,
- "Invalid %s column: %s is not a string",
- FILE_PATH,
- filePathField.dataType());
-
- StructField lastModifiedField = schema.apply(LAST_MODIFIED);
- Preconditions.checkArgument(
- lastModifiedField.dataType() == DataTypes.TimestampType,
- "Invalid %s column: %s is not a timestamp",
- LAST_MODIFIED,
- lastModifiedField.dataType());
-
- this.compareToFileList = files;
+ super.compareToFileList(files);
return this;
}
-
- private Dataset<Row> filteredCompareToFileList() {
- Dataset<Row> files = compareToFileList;
- if (location != null) {
- files = files.filter(files.col(FILE_PATH).startsWith(location));
- }
- return files
- .filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp)))
- .select(files.col(FILE_PATH));
- }
-
- @Override
- public DeleteOrphanFiles.Result execute() {
- JobGroupInfo info = newJobGroupInfo("DELETE-ORPHAN-FILES", jobDesc());
- return withJobGroupInfo(info, this::doExecute);
- }
-
- private String jobDesc() {
- List<String> options = Lists.newArrayList();
- options.add("older_than=" + olderThanTimestamp);
- if (location != null) {
- options.add("location=" + location);
- }
- return String.format("Deleting orphan files (%s) from %s", Joiner.on(',').join(options), table.name());
- }
-
- private DeleteOrphanFiles.Result doExecute() {
- Dataset<Row> validContentFileDF = buildValidContentFileDF(table);
- Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
- Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
- Dataset<Row> actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
-
- Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
- Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
- Column nameEqual = actualFileName.equalTo(validFileName);
- Column actualContains = actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
- Column joinCond = nameEqual.and(actualContains);
- List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
- .as(Encoders.STRING())
- .collectAsList();
-
- Tasks.foreach(orphanFiles)
- .noRetry()
- .executeWith(deleteExecutorService)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
- .run(deleteFunc::accept);
-
- return new BaseDeleteOrphanFilesActionResult(orphanFiles);
- }
-
- private Dataset<Row> buildActualFileDF() {
- List<String> subDirs = Lists.newArrayList();
- List<String> matchingFiles = Lists.newArrayList();
-
- Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
- PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
-
- // list at most 3 levels and only dirs that have less than 10 direct sub dirs on the driver
- listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, pathFilter, matchingFiles);
-
- JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
-
- if (subDirs.isEmpty()) {
- return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
- }
-
- int parallelism = Math.min(subDirs.size(), partitionDiscoveryParallelism);
- JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);
-
- Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
- JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(
- listDirsRecursively(conf, olderThanTimestamp, pathFilter)
- );
-
- JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
- return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
- }
-
- private static void listDirRecursively(
- String dir, Predicate<FileStatus> predicate, Configuration conf, int maxDepth,
- int maxDirectSubDirs, List<String> remainingSubDirs, PathFilter pathFilter, List<String> matchingFiles) {
-
- // stop listing whenever we reach the max depth
- if (maxDepth <= 0) {
- remainingSubDirs.add(dir);
- return;
- }
-
- try {
- Path path = new Path(dir);
- FileSystem fs = path.getFileSystem(conf);
-
- List<String> subDirs = Lists.newArrayList();
-
- for (FileStatus file : fs.listStatus(path, pathFilter)) {
- if (file.isDirectory()) {
- subDirs.add(file.getPath().toString());
- } else if (file.isFile() && predicate.test(file)) {
- matchingFiles.add(file.getPath().toString());
- }
- }
-
- // stop listing if the number of direct sub dirs is bigger than maxDirectSubDirs
- if (subDirs.size() > maxDirectSubDirs) {
- remainingSubDirs.addAll(subDirs);
- return;
- }
-
- for (String subDir : subDirs) {
- listDirRecursively(
- subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, pathFilter, matchingFiles);
- }
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
- }
-
- private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
- Broadcast<SerializableConfiguration> conf,
- long olderThanTimestamp,
- PathFilter pathFilter) {
-
- return dirs -> {
- List<String> subDirs = Lists.newArrayList();
- List<String> files = Lists.newArrayList();
-
- Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
-
- int maxDepth = 2000;
- int maxDirectSubDirs = Integer.MAX_VALUE;
-
- dirs.forEachRemaining(dir -> {
- listDirRecursively(
- dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, pathFilter, files);
- });
-
- if (!subDirs.isEmpty()) {
- throw new RuntimeException("Could not list subdirectories, reached maximum subdirectory depth: " + maxDepth);
- }
-
- return files.iterator();
- };
- }
-
- /**
- * A {@link PathFilter} that filters out hidden path, but does not filter out paths that would be marked
- * as hidden by {@link HiddenPathFilter} due to a partition field that starts with one of the characters that
- * indicate a hidden path.
- */
- @VisibleForTesting
- static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable {
-
- private final Set<String> hiddenPathPartitionNames;
-
- PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
- this.hiddenPathPartitionNames = hiddenPathPartitionNames;
- }
-
- @Override
- public boolean accept(Path path) {
- boolean isHiddenPartitionPath = hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
- return isHiddenPartitionPath || HiddenPathFilter.get().accept(path);
- }
-
- static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
- if (specs == null) {
- return HiddenPathFilter.get();
- }
-
- Set<String> partitionNames = specs.values().stream()
- .map(PartitionSpec::fields)
- .flatMap(List::stream)
- .filter(partitionField -> partitionField.name().startsWith("_") || partitionField.name().startsWith("."))
- .map(partitionField -> partitionField.name() + "=")
- .collect(Collectors.toSet());
-
- return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
- }
- }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
index fc2a5fa5cf..0c69e4e1e6 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
@@ -19,165 +19,21 @@
package org.apache.iceberg.spark.actions;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableMetadataParser;
-import org.apache.iceberg.actions.BaseDeleteReachableFilesActionResult;
-import org.apache.iceberg.actions.DeleteReachableFiles;
-import org.apache.iceberg.exceptions.NotFoundException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
/**
- * An implementation of {@link DeleteReachableFiles} that uses metadata tables in Spark
- * to determine which files should be deleted.
+ * An action that deletes reachable files from a given metadata file.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ * use {@link SparkActions} and {@link DeleteReachableFilesSparkAction} instead.
*/
-@SuppressWarnings("UnnecessaryAnonymousClass")
-public class BaseDeleteReachableFilesSparkAction
- extends BaseSparkAction<DeleteReachableFiles, DeleteReachableFiles.Result> implements DeleteReachableFiles {
+@Deprecated
+public class BaseDeleteReachableFilesSparkAction extends DeleteReachableFilesSparkAction {
public static final String STREAM_RESULTS = "stream-results";
public static final boolean STREAM_RESULTS_DEFAULT = false;
- private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class);
-
- private final String metadataFileLocation;
- private final Consumer<String> defaultDelete = new Consumer<String>() {
- @Override
- public void accept(String file) {
- io.deleteFile(file);
- }
- };
-
- private Consumer<String> deleteFunc = defaultDelete;
- private ExecutorService deleteExecutorService = null;
- private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());
-
public BaseDeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) {
- super(spark);
- this.metadataFileLocation = metadataFileLocation;
- }
-
- @Override
- protected DeleteReachableFiles self() {
- return this;
- }
-
- @Override
- public DeleteReachableFiles io(FileIO fileIO) {
- this.io = fileIO;
- return this;
- }
-
- @Override
- public DeleteReachableFiles deleteWith(Consumer<String> newDeleteFunc) {
- this.deleteFunc = newDeleteFunc;
- return this;
- }
-
- @Override
- public DeleteReachableFiles executeDeleteWith(ExecutorService executorService) {
- this.deleteExecutorService = executorService;
- return this;
- }
-
- @Override
- public Result execute() {
- Preconditions.checkArgument(io != null, "File IO cannot be null");
- String jobDesc = String.format("Deleting files reachable from %s", metadataFileLocation);
- JobGroupInfo info = newJobGroupInfo("DELETE-REACHABLE-FILES", jobDesc);
- return withJobGroupInfo(info, this::doExecute);
- }
-
- private Result doExecute() {
- TableMetadata metadata = TableMetadataParser.read(io, metadataFileLocation);
-
- ValidationException.check(
- PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
- "Cannot delete files: GC is disabled (deleting files may corrupt other tables)");
-
- Dataset<Row> reachableFileDF = buildReachableFileDF(metadata).distinct();
-
- boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
- if (streamResults) {
- return deleteFiles(reachableFileDF.toLocalIterator());
- } else {
- return deleteFiles(reachableFileDF.collectAsList().iterator());
- }
- }
-
- private Dataset<Row> buildReachableFileDF(TableMetadata metadata) {
- Table staticTable = newStaticTable(metadata, io);
- return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE)
- .union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
- .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST))
- .union(withFileType(buildAllReachableOtherMetadataFileDF(staticTable), OTHERS));
- }
-
- /**
- * Deletes files passed to it.
- *
- * @param deleted an Iterator of Spark Rows of the structure (path: String, type: String)
- * @return Statistics on which files were deleted
- */
- private BaseDeleteReachableFilesActionResult deleteFiles(Iterator<Row> deleted) {
- AtomicLong dataFileCount = new AtomicLong(0L);
- AtomicLong manifestCount = new AtomicLong(0L);
- AtomicLong manifestListCount = new AtomicLong(0L);
- AtomicLong otherFilesCount = new AtomicLong(0L);
-
- Tasks.foreach(deleted)
- .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished()
- .executeWith(deleteExecutorService)
- .onFailure((fileInfo, exc) -> {
- String file = fileInfo.getString(0);
- String type = fileInfo.getString(1);
- LOG.warn("Delete failed for {}: {}", type, file, exc);
- })
- .run(fileInfo -> {
- String file = fileInfo.getString(0);
- String type = fileInfo.getString(1);
- deleteFunc.accept(file);
- switch (type) {
- case CONTENT_FILE:
- dataFileCount.incrementAndGet();
- LOG.trace("Deleted Content File: {}", file);
- break;
- case MANIFEST:
- manifestCount.incrementAndGet();
- LOG.debug("Deleted Manifest: {}", file);
- break;
- case MANIFEST_LIST:
- manifestListCount.incrementAndGet();
- LOG.debug("Deleted Manifest List: {}", file);
- break;
- case OTHERS:
- otherFilesCount.incrementAndGet();
- LOG.debug("Others: {}", file);
- break;
- }
- });
-
- long filesCount = dataFileCount.get() + manifestCount.get() + manifestListCount.get() + otherFilesCount.get();
- LOG.info("Total files removed: {}", filesCount);
- return new BaseDeleteReachableFilesActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get(),
- otherFilesCount.get());
+ super(spark, metadataFileLocation);
}
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
index 2216383337..cc70279f39 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
@@ -19,258 +19,54 @@
package org.apache.iceberg.spark.actions;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import org.apache.iceberg.FileContent;
-import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
-import org.apache.iceberg.actions.ExpireSnapshots;
-import org.apache.iceberg.exceptions.NotFoundException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
/**
- * An action that performs the same operation as {@link org.apache.iceberg.ExpireSnapshots} but uses Spark
- * to determine the delta in files between the pre and post-expiration table metadata. All of the same
- * restrictions of {@link org.apache.iceberg.ExpireSnapshots} also apply to this action.
- * <p>
- * This action first leverages {@link org.apache.iceberg.ExpireSnapshots} to expire snapshots and then
- * uses metadata tables to find files that can be safely deleted. This is done by anti-joining two Datasets
- * that contain all manifest and content files before and after the expiration. The snapshot expiration
- * will be fully committed before any deletes are issued.
- * <p>
- * This operation performs a shuffle so the parallelism can be controlled through 'spark.sql.shuffle.partitions'.
- * <p>
- * Deletes are still performed locally after retrieving the results from the Spark executors.
+ * An action to expire snapshots.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ * use {@link SparkActions} and {@link ExpireSnapshotsSparkAction} instead.
*/
-@SuppressWarnings("UnnecessaryAnonymousClass")
-public class BaseExpireSnapshotsSparkAction
- extends BaseSparkAction<ExpireSnapshots, ExpireSnapshots.Result> implements ExpireSnapshots {
+@Deprecated
+public class BaseExpireSnapshotsSparkAction extends ExpireSnapshotsSparkAction {
public static final String STREAM_RESULTS = "stream-results";
public static final boolean STREAM_RESULTS_DEFAULT = false;
- private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);
-
- private final Table table;
- private final TableOperations ops;
- private final Consumer<String> defaultDelete = new Consumer<String>() {
- @Override
- public void accept(String file) {
- ops.io().deleteFile(file);
- }
- };
-
- private final Set<Long> expiredSnapshotIds = Sets.newHashSet();
- private Long expireOlderThanValue = null;
- private Integer retainLastValue = null;
- private Consumer<String> deleteFunc = defaultDelete;
- private ExecutorService deleteExecutorService = null;
- private Dataset<Row> expiredFiles = null;
-
public BaseExpireSnapshotsSparkAction(SparkSession spark, Table table) {
- super(spark);
- this.table = table;
- this.ops = ((HasTableOperations) table).operations();
-
- ValidationException.check(
- PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
- "Cannot expire snapshots: GC is disabled (deleting files may corrupt other tables)");
- }
-
- @Override
- protected ExpireSnapshots self() {
- return this;
+ super(spark, table);
}
@Override
public BaseExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorService) {
- this.deleteExecutorService = executorService;
+ super.executeDeleteWith(executorService);
return this;
}
@Override
public BaseExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) {
- expiredSnapshotIds.add(snapshotId);
+ super.expireSnapshotId(snapshotId);
return this;
}
@Override
public BaseExpireSnapshotsSparkAction expireOlderThan(long timestampMillis) {
- this.expireOlderThanValue = timestampMillis;
+ super.expireOlderThan(timestampMillis);
return this;
}
@Override
public BaseExpireSnapshotsSparkAction retainLast(int numSnapshots) {
- Preconditions.checkArgument(1 <= numSnapshots,
- "Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots);
- this.retainLastValue = numSnapshots;
+ super.retainLast(numSnapshots);
return this;
}
@Override
public BaseExpireSnapshotsSparkAction deleteWith(Consumer<String> newDeleteFunc) {
- this.deleteFunc = newDeleteFunc;
+ super.deleteWith(newDeleteFunc);
return this;
}
-
- /**
- * Expires snapshots and commits the changes to the table, returning a Dataset of files to delete.
- * <p>
- * This does not delete data files. To delete data files, run {@link #execute()}.
- * <p>
- * This may be called before or after {@link #execute()} is called to return the expired file list.
- *
- * @return a Dataset of files that are no longer referenced by the table
- */
- public Dataset<Row> expire() {
- if (expiredFiles == null) {
- // fetch metadata before expiration
- Dataset<Row> originalFiles = buildValidFileDF(ops.current());
-
- // perform expiration
- org.apache.iceberg.ExpireSnapshots expireSnapshots = table.expireSnapshots().cleanExpiredFiles(false);
- for (long id : expiredSnapshotIds) {
- expireSnapshots = expireSnapshots.expireSnapshotId(id);
- }
-
- if (expireOlderThanValue != null) {
- expireSnapshots = expireSnapshots.expireOlderThan(expireOlderThanValue);
- }
-
- if (retainLastValue != null) {
- expireSnapshots = expireSnapshots.retainLast(retainLastValue);
- }
-
- expireSnapshots.commit();
-
- // fetch metadata after expiration
- Dataset<Row> validFiles = buildValidFileDF(ops.refresh());
-
- // determine expired files
- this.expiredFiles = originalFiles.except(validFiles);
- }
-
- return expiredFiles;
- }
-
- @Override
- public ExpireSnapshots.Result execute() {
- JobGroupInfo info = newJobGroupInfo("EXPIRE-SNAPSHOTS", jobDesc());
- return withJobGroupInfo(info, this::doExecute);
- }
-
- private String jobDesc() {
- List<String> options = Lists.newArrayList();
-
- if (expireOlderThanValue != null) {
- options.add("older_than=" + expireOlderThanValue);
- }
-
- if (retainLastValue != null) {
- options.add("retain_last=" + retainLastValue);
- }
-
- if (!expiredSnapshotIds.isEmpty()) {
- Long first = expiredSnapshotIds.stream().findFirst().get();
- if (expiredSnapshotIds.size() > 1) {
- options.add(String.format("snapshot_ids: %s (%s more...)", first, expiredSnapshotIds.size() - 1));
- } else {
- options.add(String.format("snapshot_id: %s", first));
- }
- }
-
- return String.format("Expiring snapshots (%s) in %s", Joiner.on(',').join(options), table.name());
- }
-
- private ExpireSnapshots.Result doExecute() {
- boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
- if (streamResults) {
- return deleteFiles(expire().toLocalIterator());
- } else {
- return deleteFiles(expire().collectAsList().iterator());
- }
- }
-
- private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
- Table staticTable = newStaticTable(metadata, table.io());
- return buildValidContentFileWithTypeDF(staticTable)
- .union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
- .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
- }
-
- /**
- * Deletes files passed to it based on their type.
- *
- * @param expired an Iterator of Spark Rows of the structure (path: String, type: String)
- * @return Statistics on which files were deleted
- */
- private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
- AtomicLong dataFileCount = new AtomicLong(0L);
- AtomicLong posDeleteFileCount = new AtomicLong(0L);
- AtomicLong eqDeleteFileCount = new AtomicLong(0L);
- AtomicLong manifestCount = new AtomicLong(0L);
- AtomicLong manifestListCount = new AtomicLong(0L);
-
- Tasks.foreach(expired)
- .retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished()
- .executeWith(deleteExecutorService)
- .onFailure((fileInfo, exc) -> {
- String file = fileInfo.getString(0);
- String type = fileInfo.getString(1);
- LOG.warn("Delete failed for {}: {}", type, file, exc);
- })
- .run(fileInfo -> {
- String file = fileInfo.getString(0);
- String type = fileInfo.getString(1);
- deleteFunc.accept(file);
-
- if (FileContent.DATA.name().equalsIgnoreCase(type)) {
- dataFileCount.incrementAndGet();
- LOG.trace("Deleted Data File: {}", file);
- } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
- posDeleteFileCount.incrementAndGet();
- LOG.trace("Deleted Positional Delete File: {}", file);
- } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
- eqDeleteFileCount.incrementAndGet();
- LOG.trace("Deleted Equality Delete File: {}", file);
- } else if (MANIFEST.equals(type)) {
- manifestCount.incrementAndGet();
- LOG.debug("Deleted Manifest: {}", file);
- } else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
- manifestListCount.incrementAndGet();
- LOG.debug("Deleted Manifest List: {}", file);
- } else {
- throw new ValidationException("Illegal file type: %s", type);
- }
- });
-
- long contentFileCount = dataFileCount.get() + posDeleteFileCount.get() + eqDeleteFileCount.get();
- LOG.info("Deleted {} total files", contentFileCount + manifestCount.get() + manifestListCount.get());
-
- return new BaseExpireSnapshotsActionResult(dataFileCount.get(), posDeleteFileCount.get(),
- eqDeleteFileCount.get(), manifestCount.get(), manifestListCount.get());
- }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
index ef9f0d3e25..78a6fa20f5 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
@@ -19,196 +19,19 @@
package org.apache.iceberg.spark.actions;
-import java.util.Map;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseMigrateTableActionResult;
-import org.apache.iceberg.actions.MigrateTable;
-import org.apache.iceberg.exceptions.AlreadyExistsException;
-import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.SparkSessionCatalog;
-import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Some;
-import scala.collection.JavaConverters;
/**
- * Takes a Spark table in the source catalog and attempts to transform it into an Iceberg
- * table in the same location with the same identifier. Once complete the identifier which
- * previously referred to a non-Iceberg table will refer to the newly migrated Iceberg
- * table.
+ * An action to migrate a table to Iceberg.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ * use {@link SparkActions} and {@link MigrateTableSparkAction} instead.
*/
-public class BaseMigrateTableSparkAction
- extends BaseTableCreationSparkAction<MigrateTable, MigrateTable.Result>
- implements MigrateTable {
-
- private static final Logger LOG = LoggerFactory.getLogger(BaseMigrateTableSparkAction.class);
- private static final String BACKUP_SUFFIX = "_BACKUP_";
-
- private final StagingTableCatalog destCatalog;
- private final Identifier destTableIdent;
- private final Identifier backupIdent;
-
+@Deprecated
+public class BaseMigrateTableSparkAction extends MigrateTableSparkAction {
public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
super(spark, sourceCatalog, sourceTableIdent);
- this.destCatalog = checkDestinationCatalog(sourceCatalog);
- this.destTableIdent = sourceTableIdent;
- String backupName = sourceTableIdent.name() + BACKUP_SUFFIX;
- this.backupIdent = Identifier.of(sourceTableIdent.namespace(), backupName);
- }
-
- @Override
- protected MigrateTable self() {
- return this;
- }
-
- @Override
- protected StagingTableCatalog destCatalog() {
- return destCatalog;
- }
-
- @Override
- protected Identifier destTableIdent() {
- return destTableIdent;
- }
-
- @Override
- public MigrateTable tableProperties(Map<String, String> properties) {
- setProperties(properties);
- return this;
- }
-
- @Override
- public MigrateTable tableProperty(String property, String value) {
- setProperty(property, value);
- return this;
- }
-
- @Override
- public MigrateTable.Result execute() {
- String desc = String.format("Migrating table %s", destTableIdent().toString());
- JobGroupInfo info = newJobGroupInfo("MIGRATE-TABLE", desc);
- return withJobGroupInfo(info, this::doExecute);
- }
-
- private MigrateTable.Result doExecute() {
- LOG.info("Starting the migration of {} to Iceberg", sourceTableIdent());
-
- // move the source table to a new name, halting all modifications and allowing us to stage
- // the creation of a new Iceberg table in its place
- renameAndBackupSourceTable();
-
- StagedSparkTable stagedTable = null;
- Table icebergTable;
- boolean threw = true;
- try {
- LOG.info("Staging a new Iceberg table {}", destTableIdent());
- stagedTable = stageDestTable();
- icebergTable = stagedTable.table();
-
- LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
- ensureNameMappingPresent(icebergTable);
-
- Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
- TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), backupNamespace);
- String stagingLocation = getMetadataLocation(icebergTable);
- LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
- SparkTableUtil.importSparkTable(spark(), v1BackupIdent, icebergTable, stagingLocation);
-
- LOG.info("Committing staged changes to {}", destTableIdent());
- stagedTable.commitStagedChanges();
- threw = false;
- } finally {
- if (threw) {
- LOG.error("Failed to perform the migration, aborting table creation and restoring the original table");
-
- restoreSourceTable();
-
- if (stagedTable != null) {
- try {
- stagedTable.abortStagedChanges();
- } catch (Exception abortException) {
- LOG.error("Cannot abort staged changes", abortException);
- }
- }
- }
- }
-
- Snapshot snapshot = icebergTable.currentSnapshot();
- long migratedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
- LOG.info("Successfully loaded Iceberg metadata for {} files to {}", migratedDataFilesCount, destTableIdent());
- return new BaseMigrateTableActionResult(migratedDataFilesCount);
- }
-
- @Override
- protected Map<String, String> destTableProps() {
- Map<String, String> properties = Maps.newHashMap();
-
- // copy over relevant source table props
- properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
- EXCLUDED_PROPERTIES.forEach(properties::remove);
-
- // set default and user-provided props
- properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
- properties.putAll(additionalProperties());
-
- // make sure we mark this table as migrated
- properties.put("migrated", "true");
-
- // inherit the source table location
- properties.putIfAbsent(LOCATION, sourceTableLocation());
-
- return properties;
- }
-
- @Override
- protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
- // currently the import code relies on being able to look up the table in the session catalog
- Preconditions.checkArgument(catalog instanceof SparkSessionCatalog,
- "Cannot migrate a table from a non-Iceberg Spark Session Catalog. Found %s of class %s as the source catalog.",
- catalog.name(), catalog.getClass().getName());
-
- return (TableCatalog) catalog;
- }
-
- private void renameAndBackupSourceTable() {
- try {
- LOG.info("Renaming {} as {} for backup", sourceTableIdent(), backupIdent);
- destCatalog().renameTable(sourceTableIdent(), backupIdent);
-
- } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
- throw new NoSuchTableException("Cannot find source table %s", sourceTableIdent());
-
- } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
- throw new AlreadyExistsException(
- "Cannot rename %s as %s for backup. The backup table already exists.",
- sourceTableIdent(), backupIdent);
- }
- }
-
- private void restoreSourceTable() {
- try {
- LOG.info("Restoring {} from {}", sourceTableIdent(), backupIdent);
- destCatalog().renameTable(backupIdent, sourceTableIdent());
-
- } catch (org.apache.spark.sql.catalyst.analysis.NoSuchTableException e) {
- LOG.error("Cannot restore the original table, the backup table {} cannot be found", backupIdent, e);
-
- } catch (org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException e) {
- LOG.error("Cannot restore the original table, a table with the original name exists. " +
- "Use the backup table {} to restore the original table manually.", backupIdent, e);
- }
}
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index d1937fcbcd..d23a367fbd 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -19,459 +19,18 @@
package org.apache.iceberg.spark.actions;
-import java.io.IOException;
-import java.math.RoundingMode;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.RewriteJobOrder;
-import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseRewriteDataFilesFileGroupInfo;
-import org.apache.iceberg.actions.BaseRewriteDataFilesResult;
-import org.apache.iceberg.actions.BinPackStrategy;
-import org.apache.iceberg.actions.RewriteDataFiles;
-import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
-import org.apache.iceberg.actions.RewriteFileGroup;
-import org.apache.iceberg.actions.RewriteStrategy;
-import org.apache.iceberg.actions.SortStrategy;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.exceptions.CommitFailedException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Queues;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.math.IntMath;
-import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
-import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.StructLikeMap;
-import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BaseRewriteDataFilesSparkAction
- extends BaseSnapshotUpdateSparkAction<RewriteDataFiles, RewriteDataFiles.Result> implements RewriteDataFiles {
-
- private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesSparkAction.class);
- private static final Set<String> VALID_OPTIONS = ImmutableSet.of(
- MAX_CONCURRENT_FILE_GROUP_REWRITES,
- MAX_FILE_GROUP_SIZE_BYTES,
- PARTIAL_PROGRESS_ENABLED,
- PARTIAL_PROGRESS_MAX_COMMITS,
- TARGET_FILE_SIZE_BYTES,
- USE_STARTING_SEQUENCE_NUMBER,
- REWRITE_JOB_ORDER
- );
-
- private final Table table;
-
- private Expression filter = Expressions.alwaysTrue();
- private int maxConcurrentFileGroupRewrites;
- private int maxCommits;
- private boolean partialProgressEnabled;
- private boolean useStartingSequenceNumber;
- private RewriteJobOrder rewriteJobOrder;
- private RewriteStrategy strategy = null;
+/**
+ * An action to rewrite data files.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ * use {@link SparkActions} and {@link RewriteDataFilesSparkAction} instead.
+ */
+@Deprecated
+public class BaseRewriteDataFilesSparkAction extends RewriteDataFilesSparkAction {
protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
- super(spark);
- this.table = table;
- }
-
- @Override
- protected RewriteDataFiles self() {
- return this;
- }
-
- @Override
- public RewriteDataFiles binPack() {
- Preconditions.checkArgument(this.strategy == null,
- "Cannot set strategy to binpack, it has already been set", this.strategy);
- this.strategy = binPackStrategy();
- return this;
- }
-
- @Override
- public RewriteDataFiles sort(SortOrder sortOrder) {
- Preconditions.checkArgument(this.strategy == null,
- "Cannot set strategy to sort, it has already been set to %s", this.strategy);
- this.strategy = sortStrategy().sortOrder(sortOrder);
- return this;
- }
-
- @Override
- public RewriteDataFiles sort() {
- Preconditions.checkArgument(this.strategy == null,
- "Cannot set strategy to sort, it has already been set to %s", this.strategy);
- this.strategy = sortStrategy();
- return this;
- }
-
- @Override
- public RewriteDataFiles zOrder(String... columnNames) {
- Preconditions.checkArgument(this.strategy == null,
- "Cannot set strategy to zorder, it has already been set to %s", this.strategy);
- this.strategy = zOrderStrategy(columnNames);
- return this;
- }
-
- @Override
- public RewriteDataFiles filter(Expression expression) {
- filter = Expressions.and(filter, expression);
- return this;
- }
-
- @Override
- public RewriteDataFiles.Result execute() {
- if (table.currentSnapshot() == null) {
- return new BaseRewriteDataFilesResult(ImmutableList.of());
- }
-
- long startingSnapshotId = table.currentSnapshot().snapshotId();
-
- // Default to BinPack if no strategy selected
- if (this.strategy == null) {
- this.strategy = binPackStrategy();
- }
-
- validateAndInitOptions();
-
- Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition = planFileGroups(startingSnapshotId);
- RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
-
- if (ctx.totalGroupCount() == 0) {
- LOG.info("Nothing found to rewrite in {}", table.name());
- return new BaseRewriteDataFilesResult(Collections.emptyList());
- }
-
- Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);
-
- RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId);
- if (partialProgressEnabled) {
- return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
- } else {
- return doExecute(ctx, groupStream, commitManager);
- }
- }
-
- Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
- CloseableIterable<FileScanTask> fileScanTasks = table.newScan()
- .useSnapshot(startingSnapshotId)
- .filter(filter)
- .ignoreResiduals()
- .planFiles();
-
- try {
- StructType partitionType = table.spec().partitionType();
- StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
- StructLike emptyStruct = GenericRecord.create(partitionType);
-
- fileScanTasks.forEach(task -> {
- // If a task uses an incompatible partition spec the data inside could contain values which
- // belong to multiple partitions in the current spec. Treating all such files as un-partitioned and
- // grouping them together helps to minimize new files made.
- StructLike taskPartition = task.file().specId() == table.spec().specId() ?
- task.file().partition() : emptyStruct;
-
- List<FileScanTask> files = filesByPartition.get(taskPartition);
- if (files == null) {
- files = Lists.newArrayList();
- }
-
- files.add(task);
- filesByPartition.put(taskPartition, files);
- });
-
- StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition = StructLikeMap.create(partitionType);
-
- filesByPartition.forEach((partition, tasks) -> {
- Iterable<FileScanTask> filtered = strategy.selectFilesToRewrite(tasks);
- Iterable<List<FileScanTask>> groupedTasks = strategy.planFileGroups(filtered);
- List<List<FileScanTask>> fileGroups = ImmutableList.copyOf(groupedTasks);
- if (fileGroups.size() > 0) {
- fileGroupsByPartition.put(partition, fileGroups);
- }
- });
-
- return fileGroupsByPartition;
- } finally {
- try {
- fileScanTasks.close();
- } catch (IOException io) {
- LOG.error("Cannot properly close file iterable while planning for rewrite", io);
- }
- }
- }
-
- @VisibleForTesting
- RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) {
- String desc = jobDesc(fileGroup, ctx);
- Set<DataFile> addedFiles = withJobGroupInfo(
- newJobGroupInfo("REWRITE-DATA-FILES", desc),
- () -> strategy.rewriteFiles(fileGroup.fileScans()));
-
- fileGroup.setOutputFiles(addedFiles);
- LOG.info("Rewrite Files Ready to be Committed - {}", desc);
- return fileGroup;
- }
-
- private ExecutorService rewriteService() {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) Executors.newFixedThreadPool(
- maxConcurrentFileGroupRewrites,
- new ThreadFactoryBuilder()
- .setNameFormat("Rewrite-Service-%d")
- .build()));
- }
-
- @VisibleForTesting
- RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
- return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber);
- }
-
- private Result doExecute(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
- RewriteDataFilesCommitManager commitManager) {
- ExecutorService rewriteService = rewriteService();
-
- ConcurrentLinkedQueue<RewriteFileGroup> rewrittenGroups = Queues.newConcurrentLinkedQueue();
-
- Tasks.Builder<RewriteFileGroup> rewriteTaskBuilder = Tasks.foreach(groupStream)
- .executeWith(rewriteService)
- .stopOnFailure()
- .noRetry()
- .onFailure((fileGroup, exception) -> {
- LOG.warn("Failure during rewrite process for group {}", fileGroup.info(), exception);
- });
-
- try {
- rewriteTaskBuilder.run(fileGroup -> {
- rewrittenGroups.add(rewriteFiles(ctx, fileGroup));
- });
- } catch (Exception e) {
- // At least one rewrite group failed, clean up all completed rewrites
- LOG.error("Cannot complete rewrite, {} is not enabled and one of the file set groups failed to " +
- "be rewritten. This error occurred during the writing of new files, not during the commit process. This " +
- "indicates something is wrong that doesn't involve conflicts with other Iceberg operations. Enabling " +
- "{} may help in this case but the root cause should be investigated. Cleaning up {} groups which finished " +
- "being written.", PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_ENABLED, rewrittenGroups.size(), e);
-
- Tasks.foreach(rewrittenGroups)
- .suppressFailureWhenFinished()
- .run(group -> commitManager.abortFileGroup(group));
- throw e;
- } finally {
- rewriteService.shutdown();
- }
-
- try {
- commitManager.commitOrClean(Sets.newHashSet(rewrittenGroups));
- } catch (ValidationException | CommitFailedException e) {
- String errorMessage = String.format(
- "Cannot commit rewrite because of a ValidationException or CommitFailedException. This usually means that " +
- "this rewrite has conflicted with another concurrent Iceberg operation. To reduce the likelihood of " +
- "conflicts, set %s which will break up the rewrite into multiple smaller commits controlled by %s. " +
- "Separate smaller rewrite commits can succeed independently while any commits that conflict with " +
- "another Iceberg operation will be ignored. This mode will create additional snapshots in the table " +
- "history, one for each commit.",
- PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS);
- throw new RuntimeException(errorMessage, e);
- }
-
- List<FileGroupRewriteResult> rewriteResults = rewrittenGroups.stream()
- .map(RewriteFileGroup::asResult)
- .collect(Collectors.toList());
- return new BaseRewriteDataFilesResult(rewriteResults);
- }
-
- private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
- RewriteDataFilesCommitManager commitManager) {
- ExecutorService rewriteService = rewriteService();
-
- // Start Commit Service
- int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING);
- RewriteDataFilesCommitManager.CommitService commitService = commitManager.service(groupsPerCommit);
- commitService.start();
-
- // Start rewrite tasks
- Tasks.foreach(groupStream)
- .suppressFailureWhenFinished()
- .executeWith(rewriteService)
- .noRetry()
- .onFailure((fileGroup, exception) -> LOG.error("Failure during rewrite group {}", fileGroup.info(), exception))
- .run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
- rewriteService.shutdown();
-
- // Stop Commit service
- commitService.close();
- List<RewriteFileGroup> commitResults = commitService.results();
- if (commitResults.size() == 0) {
- LOG.error("{} is true but no rewrite commits succeeded. Check the logs to determine why the individual " +
- "commits failed. If this is persistent it may help to increase {} which will break the rewrite operation " +
- "into smaller commits.", PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS);
- }
-
- List<FileGroupRewriteResult> rewriteResults = commitResults.stream()
- .map(RewriteFileGroup::asResult)
- .collect(Collectors.toList());
- return new BaseRewriteDataFilesResult(rewriteResults);
- }
-
- Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
- Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
- Stream<RewriteFileGroup> rewriteFileGroupStream = fileGroupsByPartition.entrySet().stream()
- .flatMap(e -> {
- StructLike partition = e.getKey();
- List<List<FileScanTask>> fileGroups = e.getValue();
- return fileGroups.stream().map(tasks -> {
- int globalIndex = ctx.currentGlobalIndex();
- int partitionIndex = ctx.currentPartitionIndex(partition);
- FileGroupInfo info = new BaseRewriteDataFilesFileGroupInfo(globalIndex, partitionIndex, partition);
- return new RewriteFileGroup(info, tasks);
- });
- });
-
- return rewriteFileGroupStream.sorted(rewriteGroupComparator());
- }
-
- private Comparator<RewriteFileGroup> rewriteGroupComparator() {
- switch (rewriteJobOrder) {
- case BYTES_ASC:
- return Comparator.comparing(RewriteFileGroup::sizeInBytes);
- case BYTES_DESC:
- return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
- case FILES_ASC:
- return Comparator.comparing(RewriteFileGroup::numFiles);
- case FILES_DESC:
- return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
- default:
- return (fileGroupOne, fileGroupTwo) -> 0;
- }
- }
-
- void validateAndInitOptions() {
- Set<String> validOptions = Sets.newHashSet(strategy.validOptions());
- validOptions.addAll(VALID_OPTIONS);
-
- Set<String> invalidKeys = Sets.newHashSet(options().keySet());
- invalidKeys.removeAll(validOptions);
-
- Preconditions.checkArgument(invalidKeys.isEmpty(),
- "Cannot use options %s, they are not supported by the action or the strategy %s",
- invalidKeys, strategy.name());
-
- strategy = strategy.options(options());
-
- maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(),
- MAX_CONCURRENT_FILE_GROUP_REWRITES,
- MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT);
-
- maxCommits = PropertyUtil.propertyAsInt(options(),
- PARTIAL_PROGRESS_MAX_COMMITS,
- PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT);
-
- partialProgressEnabled = PropertyUtil.propertyAsBoolean(options(),
- PARTIAL_PROGRESS_ENABLED,
- PARTIAL_PROGRESS_ENABLED_DEFAULT);
-
- useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(),
- USE_STARTING_SEQUENCE_NUMBER,
- USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
-
- rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(),
- REWRITE_JOB_ORDER,
- REWRITE_JOB_ORDER_DEFAULT));
-
- Preconditions.checkArgument(maxConcurrentFileGroupRewrites >= 1,
- "Cannot set %s to %s, the value must be positive.",
- MAX_CONCURRENT_FILE_GROUP_REWRITES, maxConcurrentFileGroupRewrites);
-
- Preconditions.checkArgument(!partialProgressEnabled || maxCommits > 0,
- "Cannot set %s to %s, the value must be positive when %s is true",
- PARTIAL_PROGRESS_MAX_COMMITS, maxCommits, PARTIAL_PROGRESS_ENABLED);
- }
-
- private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) {
- StructLike partition = group.info().partition();
- if (partition.size() > 0) {
- return String.format("Rewriting %d files (%s, file group %d/%d, %s (%d/%d)) in %s",
- group.rewrittenFiles().size(),
- strategy.name(), group.info().globalIndex(),
- ctx.totalGroupCount(), partition, group.info().partitionIndex(), ctx.groupsInPartition(partition),
- table.name());
- } else {
- return String.format("Rewriting %d files (%s, file group %d/%d) in %s",
- group.rewrittenFiles().size(),
- strategy.name(), group.info().globalIndex(), ctx.totalGroupCount(),
- table.name());
- }
- }
-
- private BinPackStrategy binPackStrategy() {
- return new SparkBinPackStrategy(table, spark());
- }
-
- private SortStrategy sortStrategy() {
- return new SparkSortStrategy(table, spark());
- }
-
- private SortStrategy zOrderStrategy(String... columnNames) {
- return new SparkZOrderStrategy(table, spark(), Lists.newArrayList(columnNames));
- }
-
- @VisibleForTesting
- static class RewriteExecutionContext {
- private final Map<StructLike, Integer> numGroupsByPartition;
- private final int totalGroupCount;
- private final Map<StructLike, Integer> partitionIndexMap;
- private final AtomicInteger groupIndex;
-
- RewriteExecutionContext(Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
- this.numGroupsByPartition = fileGroupsByPartition.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
- this.totalGroupCount = numGroupsByPartition.values().stream()
- .reduce(Integer::sum)
- .orElse(0);
- this.partitionIndexMap = Maps.newConcurrentMap();
- this.groupIndex = new AtomicInteger(1);
- }
-
- public int currentGlobalIndex() {
- return groupIndex.getAndIncrement();
- }
-
- public int currentPartitionIndex(StructLike partition) {
- return partitionIndexMap.merge(partition, 1, Integer::sum);
- }
-
- public int groupsInPartition(StructLike partition) {
- return numGroupsByPartition.get(partition);
- }
-
- public int totalGroupCount() {
- return totalGroupCount;
- }
+ super(spark, table);
}
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index 1648dd26f8..d570397fc5 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -19,352 +19,18 @@
package org.apache.iceberg.spark.actions;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.HasTableOperations;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.ManifestWriter;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
-import org.apache.iceberg.actions.RewriteManifests;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.SparkDataFile;
-import org.apache.iceberg.spark.SparkUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.iceberg.util.ThreadPools;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.api.java.function.MapPartitionsFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.Column;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.internal.SQLConf;
-import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.MetadataTableType.ENTRIES;
/**
- * An action that rewrites manifests in a distributed manner and co-locates metadata for partitions.
- * <p>
- * By default, this action rewrites all manifests for the current partition spec and writes the result
- * to the metadata folder. The behavior can be modified by passing a custom predicate to {@link #rewriteIf(Predicate)}
- * and a custom spec id to {@link #specId(int)}. In addition, there is a way to configure a custom location
- * for new manifests via {@link #stagingLocation}.
+ * An action to rewrite manifests.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ * use {@link SparkActions} and {@link RewriteManifestsSparkAction} instead.
*/
-public class BaseRewriteManifestsSparkAction
- extends BaseSnapshotUpdateSparkAction<RewriteManifests, RewriteManifests.Result>
- implements RewriteManifests {
-
- private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteManifestsSparkAction.class);
-
- private static final String USE_CACHING = "use-caching";
- private static final boolean USE_CACHING_DEFAULT = true;
-
- private final Encoder<ManifestFile> manifestEncoder;
- private final Table table;
- private final int formatVersion;
- private final FileIO fileIO;
- private final long targetManifestSizeBytes;
-
- private PartitionSpec spec = null;
- private Predicate<ManifestFile> predicate = manifest -> true;
- private String stagingLocation = null;
-
+@Deprecated
+public class BaseRewriteManifestsSparkAction extends RewriteManifestsSparkAction {
public BaseRewriteManifestsSparkAction(SparkSession spark, Table table) {
- super(spark);
- this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
- this.table = table;
- this.spec = table.spec();
- this.targetManifestSizeBytes = PropertyUtil.propertyAsLong(
- table.properties(),
- TableProperties.MANIFEST_TARGET_SIZE_BYTES,
- TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
- this.fileIO = SparkUtil.serializableFileIO(table);
-
- // default the staging location to the metadata location
- TableOperations ops = ((HasTableOperations) table).operations();
- Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
- this.stagingLocation = metadataFilePath.getParent().toString();
-
- // use the current table format version for new manifests
- this.formatVersion = ops.current().formatVersion();
- }
-
- @Override
- protected RewriteManifests self() {
- return this;
- }
-
- @Override
- public RewriteManifests specId(int specId) {
- Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %s", specId);
- this.spec = table.specs().get(specId);
- return this;
- }
-
- @Override
- public RewriteManifests rewriteIf(Predicate<ManifestFile> newPredicate) {
- this.predicate = newPredicate;
- return this;
- }
-
- @Override
- public RewriteManifests stagingLocation(String newStagingLocation) {
- this.stagingLocation = newStagingLocation;
- return this;
- }
-
- @Override
- public RewriteManifests.Result execute() {
- String desc = String.format("Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name());
- JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
- return withJobGroupInfo(info, this::doExecute);
- }
-
- private RewriteManifests.Result doExecute() {
- List<ManifestFile> matchingManifests = findMatchingManifests();
- if (matchingManifests.isEmpty()) {
- return BaseRewriteManifestsActionResult.empty();
- }
-
- long totalSizeBytes = 0L;
- int numEntries = 0;
-
- for (ManifestFile manifest : matchingManifests) {
- ValidationException.check(hasFileCounts(manifest), "No file counts in manifest: %s", manifest.path());
-
- totalSizeBytes += manifest.length();
- numEntries += manifest.addedFilesCount() + manifest.existingFilesCount() + manifest.deletedFilesCount();
- }
-
- int targetNumManifests = targetNumManifests(totalSizeBytes);
- int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests);
-
- Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);
-
- List<ManifestFile> newManifests;
- if (spec.fields().size() < 1) {
- newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, targetNumManifests);
- } else {
- newManifests = writeManifestsForPartitionedTable(manifestEntryDF, targetNumManifests, targetNumManifestEntries);
- }
-
- replaceManifests(matchingManifests, newManifests);
-
- return new BaseRewriteManifestsActionResult(matchingManifests, newManifests);
- }
-
- private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
- Dataset<Row> manifestDF = spark()
- .createDataset(Lists.transform(manifests, ManifestFile::path), Encoders.STRING())
- .toDF("manifest");
-
- Dataset<Row> manifestEntryDF = loadMetadataTable(table, ENTRIES)
- .filter("status < 2") // select only live entries
- .selectExpr("input_file_name() as manifest", "snapshot_id", "sequence_number", "data_file");
-
- Column joinCond = manifestDF.col("manifest").equalTo(manifestEntryDF.col("manifest"));
- return manifestEntryDF
- .join(manifestDF, joinCond, "left_semi")
- .select("snapshot_id", "sequence_number", "data_file");
- }
-
- private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) {
- Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
- StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
-
- // we rely only on the target number of manifests for unpartitioned tables
- // as we should not worry about having too much metadata per partition
- long maxNumManifestEntries = Long.MAX_VALUE;
-
- return manifestEntryDF
- .repartition(numManifests)
- .mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
- manifestEncoder
- )
- .collectAsList();
- }
-
- private List<ManifestFile> writeManifestsForPartitionedTable(
- Dataset<Row> manifestEntryDF, int numManifests,
- int targetNumManifestEntries) {
-
- Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
- StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
-
- // we allow the actual size of manifests to be 10% higher if the estimation is not precise enough
- long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
-
- return withReusableDS(manifestEntryDF, df -> {
- Column partitionColumn = df.col("data_file.partition");
- return df.repartitionByRange(numManifests, partitionColumn)
- .sortWithinPartitions(partitionColumn)
- .mapPartitions(
- toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
- manifestEncoder
- )
- .collectAsList();
- });
- }
-
- private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
- Dataset<T> reusableDS;
- boolean useCaching = PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
- if (useCaching) {
- reusableDS = ds.cache();
- } else {
- int parallelism = SQLConf.get().numShufflePartitions();
- reusableDS = ds.repartition(parallelism).map((MapFunction<T, T>) value -> value, ds.exprEnc());
- }
-
- try {
- return func.apply(reusableDS);
- } finally {
- if (useCaching) {
- reusableDS.unpersist(false);
- }
- }
- }
-
- private List<ManifestFile> findMatchingManifests() {
- Snapshot currentSnapshot = table.currentSnapshot();
-
- if (currentSnapshot == null) {
- return ImmutableList.of();
- }
-
- return currentSnapshot.dataManifests(fileIO).stream()
- .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
- .collect(Collectors.toList());
- }
-
- private int targetNumManifests(long totalSizeBytes) {
- return (int) ((totalSizeBytes + targetManifestSizeBytes - 1) / targetManifestSizeBytes);
- }
-
- private int targetNumManifestEntries(int numEntries, int numManifests) {
- return (numEntries + numManifests - 1) / numManifests;
- }
-
- private boolean hasFileCounts(ManifestFile manifest) {
- return manifest.addedFilesCount() != null &&
- manifest.existingFilesCount() != null &&
- manifest.deletedFilesCount() != null;
- }
-
- private void replaceManifests(Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) {
- try {
- boolean snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean(
- table.properties(),
- TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
- TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
-
- org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests();
- deletedManifests.forEach(rewriteManifests::deleteManifest);
- addedManifests.forEach(rewriteManifests::addManifest);
- commit(rewriteManifests);
-
- if (!snapshotIdInheritanceEnabled) {
- // delete new manifests as they were rewritten before the commit
- deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
- }
- } catch (CommitStateUnknownException commitStateUnknownException) {
- // don't clean up added manifest files, because they may have been successfully committed.
- throw commitStateUnknownException;
- } catch (Exception e) {
- // delete all new manifests because the rewrite failed
- deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
- throw e;
- }
- }
-
- private void deleteFiles(Iterable<String> locations) {
- Tasks.foreach(locations)
- .executeWith(ThreadPools.getWorkerPool())
- .noRetry()
- .suppressFailureWhenFinished()
- .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
- .run(fileIO::deleteFile);
- }
-
- private static ManifestFile writeManifest(
- List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
- String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
-
- String manifestName = "optimized-m-" + UUID.randomUUID();
- Path manifestPath = new Path(location, manifestName);
- OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
-
- Types.StructType dataFileType = DataFile.getType(spec.partitionType());
- SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
-
- ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
-
- try {
- for (int index = startIndex; index < endIndex; index++) {
- Row row = rows.get(index);
- long snapshotId = row.getLong(0);
- long sequenceNumber = row.getLong(1);
- Row file = row.getStruct(2);
- writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber);
- }
- } finally {
- writer.close();
- }
-
- return writer.toManifestFile();
- }
-
- private static MapPartitionsFunction<Row, ManifestFile> toManifests(
- Broadcast<FileIO> io, long maxNumManifestEntries, String location,
- int format, PartitionSpec spec, StructType sparkType) {
-
- return rows -> {
- List<Row> rowsAsList = Lists.newArrayList(rows);
-
- if (rowsAsList.isEmpty()) {
- return Collections.emptyIterator();
- }
-
- List<ManifestFile> manifests = Lists.newArrayList();
- if (rowsAsList.size() <= maxNumManifestEntries) {
- manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
- } else {
- int midIndex = rowsAsList.size() / 2;
- manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
- manifests.add(writeManifest(rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
- }
-
- return manifests.iterator();
- };
+ super(spark, table);
}
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
index e584d33a3f..3305401e53 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
@@ -19,186 +19,19 @@
package org.apache.iceberg.spark.actions;
-import java.util.Map;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.actions.BaseSnapshotTableActionResult;
-import org.apache.iceberg.actions.SnapshotTable;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.spark.Spark3Util;
-import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
-import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
/**
- * Creates a new Iceberg table based on a source Spark table. The new Iceberg table will
- * have a different data and metadata directory allowing it to exist independently of the
- * source table.
+ * An action to snapshot a table as an Iceberg table.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ * use {@link SparkActions} and {@link SnapshotTableSparkAction} instead.
*/
-public class BaseSnapshotTableSparkAction
- extends BaseTableCreationSparkAction<SnapshotTable, SnapshotTable.Result>
- implements SnapshotTable {
-
- private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotTableSparkAction.class);
-
- private StagingTableCatalog destCatalog;
- private Identifier destTableIdent;
- private String destTableLocation = null;
-
+@Deprecated
+public class BaseSnapshotTableSparkAction extends SnapshotTableSparkAction {
BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
super(spark, sourceCatalog, sourceTableIdent);
}
-
- @Override
- protected SnapshotTable self() {
- return this;
- }
-
- @Override
- protected StagingTableCatalog destCatalog() {
- return destCatalog;
- }
-
- @Override
- protected Identifier destTableIdent() {
- return destTableIdent;
- }
-
- @Override
- public SnapshotTable as(String ident) {
- String ctx = "snapshot destination";
- CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog();
- CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), ident, defaultCatalog);
- this.destCatalog = checkDestinationCatalog(catalogAndIdent.catalog());
- this.destTableIdent = catalogAndIdent.identifier();
- return this;
- }
-
- @Override
- public SnapshotTable tableProperties(Map<String, String> properties) {
- setProperties(properties);
- return this;
- }
-
- @Override
- public SnapshotTable tableProperty(String property, String value) {
- setProperty(property, value);
- return this;
- }
-
- @Override
- public SnapshotTable.Result execute() {
- String desc = String.format("Snapshotting table %s as %s", sourceTableIdent(), destTableIdent);
- JobGroupInfo info = newJobGroupInfo("SNAPSHOT-TABLE", desc);
- return withJobGroupInfo(info, this::doExecute);
- }
-
- private SnapshotTable.Result doExecute() {
- Preconditions.checkArgument(destCatalog() != null && destTableIdent() != null,
- "The destination catalog and identifier cannot be null. " +
- "Make sure to configure the action with a valid destination table identifier via the `as` method.");
-
- LOG.info("Staging a new Iceberg table {} as a snapshot of {}", destTableIdent(), sourceTableIdent());
- StagedSparkTable stagedTable = stageDestTable();
- Table icebergTable = stagedTable.table();
-
- // TODO: Check the dest table location does not overlap with the source table location
-
- boolean threw = true;
- try {
- LOG.info("Ensuring {} has a valid name mapping", destTableIdent());
- ensureNameMappingPresent(icebergTable);
-
- TableIdentifier v1TableIdent = v1SourceTable().identifier();
- String stagingLocation = getMetadataLocation(icebergTable);
- LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
- SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable, stagingLocation);
-
- LOG.info("Committing staged changes to {}", destTableIdent());
- stagedTable.commitStagedChanges();
- threw = false;
- } finally {
- if (threw) {
- LOG.error("Error when populating the staged table with metadata, aborting changes");
-
- try {
- stagedTable.abortStagedChanges();
- } catch (Exception abortException) {
- LOG.error("Cannot abort staged changes", abortException);
- }
- }
- }
-
- Snapshot snapshot = icebergTable.currentSnapshot();
- long importedDataFilesCount = Long.parseLong(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
- LOG.info("Successfully loaded Iceberg metadata for {} files to {}", importedDataFilesCount, destTableIdent());
- return new BaseSnapshotTableActionResult(importedDataFilesCount);
- }
-
- @Override
- protected Map<String, String> destTableProps() {
- Map<String, String> properties = Maps.newHashMap();
-
- // copy over relevant source table props
- properties.putAll(JavaConverters.mapAsJavaMapConverter(v1SourceTable().properties()).asJava());
- EXCLUDED_PROPERTIES.forEach(properties::remove);
-
- // remove any possible location properties from origin properties
- properties.remove(LOCATION);
- properties.remove(TableProperties.WRITE_METADATA_LOCATION);
- properties.remove(TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
- properties.remove(TableProperties.OBJECT_STORE_PATH);
- properties.remove(TableProperties.WRITE_DATA_LOCATION);
-
- // set default and user-provided props
- properties.put(TableCatalog.PROP_PROVIDER, "iceberg");
- properties.putAll(additionalProperties());
-
- // make sure we mark this table as a snapshot table
- properties.put(TableProperties.GC_ENABLED, "false");
- properties.put("snapshot", "true");
-
- // set the destination table location if provided
- if (destTableLocation != null) {
- properties.put(LOCATION, destTableLocation);
- }
-
- return properties;
- }
-
- @Override
- protected TableCatalog checkSourceCatalog(CatalogPlugin catalog) {
- // currently the import code relies on being able to look up the table in the session catalog
- Preconditions.checkArgument(catalog.name().equalsIgnoreCase("spark_catalog"),
- "Cannot snapshot a table that isn't in the session catalog (i.e. spark_catalog). " +
- "Found source catalog: %s.", catalog.name());
-
- Preconditions.checkArgument(catalog instanceof TableCatalog,
- "Cannot snapshot as catalog %s of class %s in not a table catalog",
- catalog.name(), catalog.getClass().getName());
-
- return (TableCatalog) catalog;
- }
-
- @Override
- public SnapshotTable tableLocation(String location) {
- Preconditions.checkArgument(!sourceTableLocation().equals(location),
- "The snapshot table location cannot be same as the source table location. " +
- "This would mix snapshot table files with original table files.");
- this.destTableLocation = location;
- return this;
- }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
index 53fa06bbb5..7ed17d75dd 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java
@@ -20,12 +20,10 @@
package org.apache.iceberg.spark.actions;
import java.util.Map;
-import org.apache.iceberg.actions.SnapshotUpdate;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.spark.sql.SparkSession;
-abstract class BaseSnapshotUpdateSparkAction<ThisT, R>
- extends BaseSparkAction<ThisT, R> implements SnapshotUpdate<ThisT, R> {
+abstract class BaseSnapshotUpdateSparkAction<ThisT> extends BaseSparkAction<ThisT> {
private final Map<String, String> summary = Maps.newHashMap();
@@ -33,7 +31,6 @@ abstract class BaseSnapshotUpdateSparkAction<ThisT, R>
super(spark);
}
- @Override
public ThisT snapshotProperty(String property, String value) {
summary.put(property, value);
return self();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 04af49db6b..8b9821f40c 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -34,7 +34,6 @@ import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.actions.Action;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
@@ -59,7 +58,7 @@ import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.lit;
-abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {
+abstract class BaseSparkAction<ThisT> {
protected static final String CONTENT_FILE = "Content File";
protected static final String MANIFEST = "Manifest";
@@ -91,13 +90,11 @@ abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {
protected abstract ThisT self();
- @Override
public ThisT option(String name, String value) {
options.put(name, value);
return self();
}
- @Override
public ThisT options(Map<String, String> newOptions) {
options.putAll(newOptions);
return self();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
index 6eadece65c..e3ddd7abc9 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java
@@ -49,7 +49,7 @@ import org.apache.spark.sql.connector.catalog.V1Table;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
-abstract class BaseTableCreationSparkAction<ThisT, R> extends BaseSparkAction<ThisT, R> {
+abstract class BaseTableCreationSparkAction<ThisT> extends BaseSparkAction<ThisT> {
private static final Set<String> ALLOWED_SOURCES = ImmutableSet.of("parquet", "avro", "orc", "hive");
protected static final String LOCATION = "location";
protected static final String ICEBERG_METADATA_FOLDER = "metadata";
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
similarity index 94%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index dc58a05d4d..72b4f8f43a 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -90,10 +90,10 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
* <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
* the state of the table if another operation is writing at the same time.
*/
-public class BaseDeleteOrphanFilesSparkAction
- extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
+public class DeleteOrphanFilesSparkAction
+ extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements DeleteOrphanFiles {
- private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
int lastIndex = path.lastIndexOf(File.separator);
if (lastIndex == -1) {
@@ -119,7 +119,7 @@ public class BaseDeleteOrphanFilesSparkAction
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = null;
- public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
+ DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
super(spark);
this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
@@ -133,35 +133,35 @@ public class BaseDeleteOrphanFilesSparkAction
}
@Override
- protected DeleteOrphanFiles self() {
+ protected DeleteOrphanFilesSparkAction self() {
return this;
}
@Override
- public BaseDeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
+ public DeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
this.deleteExecutorService = executorService;
return this;
}
@Override
- public BaseDeleteOrphanFilesSparkAction location(String newLocation) {
+ public DeleteOrphanFilesSparkAction location(String newLocation) {
this.location = newLocation;
return this;
}
@Override
- public BaseDeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
+ public DeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
this.olderThanTimestamp = newOlderThanTimestamp;
return this;
}
@Override
- public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
+ public DeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
this.deleteFunc = newDeleteFunc;
return this;
}
- public BaseDeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
+ public DeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
StructType schema = files.schema();
StructField filePathField = schema.apply(FILE_PATH);
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
similarity index 91%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
index fc2a5fa5cf..e840d90c2c 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java
@@ -50,13 +50,13 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
* to determine which files should be deleted.
*/
@SuppressWarnings("UnnecessaryAnonymousClass")
-public class BaseDeleteReachableFilesSparkAction
- extends BaseSparkAction<DeleteReachableFiles, DeleteReachableFiles.Result> implements DeleteReachableFiles {
+public class DeleteReachableFilesSparkAction
+ extends BaseSparkAction<DeleteReachableFilesSparkAction> implements DeleteReachableFiles {
public static final String STREAM_RESULTS = "stream-results";
public static final boolean STREAM_RESULTS_DEFAULT = false;
- private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DeleteReachableFilesSparkAction.class);
private final String metadataFileLocation;
private final Consumer<String> defaultDelete = new Consumer<String>() {
@@ -70,30 +70,30 @@ public class BaseDeleteReachableFilesSparkAction
private ExecutorService deleteExecutorService = null;
private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());
- public BaseDeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) {
+ DeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) {
super(spark);
this.metadataFileLocation = metadataFileLocation;
}
@Override
- protected DeleteReachableFiles self() {
+ protected DeleteReachableFilesSparkAction self() {
return this;
}
@Override
- public DeleteReachableFiles io(FileIO fileIO) {
+ public DeleteReachableFilesSparkAction io(FileIO fileIO) {
this.io = fileIO;
return this;
}
@Override
- public DeleteReachableFiles deleteWith(Consumer<String> newDeleteFunc) {
+ public DeleteReachableFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
this.deleteFunc = newDeleteFunc;
return this;
}
@Override
- public DeleteReachableFiles executeDeleteWith(ExecutorService executorService) {
+ public DeleteReachableFilesSparkAction executeDeleteWith(ExecutorService executorService) {
this.deleteExecutorService = executorService;
return this;
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
similarity index 93%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 2216383337..f3fc7bdc14 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -65,13 +65,13 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
* Deletes are still performed locally after retrieving the results from the Spark executors.
*/
@SuppressWarnings("UnnecessaryAnonymousClass")
-public class BaseExpireSnapshotsSparkAction
- extends BaseSparkAction<ExpireSnapshots, ExpireSnapshots.Result> implements ExpireSnapshots {
+public class ExpireSnapshotsSparkAction
+ extends BaseSparkAction<ExpireSnapshotsSparkAction> implements ExpireSnapshots {
public static final String STREAM_RESULTS = "stream-results";
public static final boolean STREAM_RESULTS_DEFAULT = false;
- private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsSparkAction.class);
private final Table table;
private final TableOperations ops;
@@ -89,7 +89,7 @@ public class BaseExpireSnapshotsSparkAction
private ExecutorService deleteExecutorService = null;
private Dataset<Row> expiredFiles = null;
- public BaseExpireSnapshotsSparkAction(SparkSession spark, Table table) {
+ ExpireSnapshotsSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
this.ops = ((HasTableOperations) table).operations();
@@ -100,30 +100,30 @@ public class BaseExpireSnapshotsSparkAction
}
@Override
- protected ExpireSnapshots self() {
+ protected ExpireSnapshotsSparkAction self() {
return this;
}
@Override
- public BaseExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorService) {
+ public ExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorService) {
this.deleteExecutorService = executorService;
return this;
}
@Override
- public BaseExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) {
+ public ExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) {
expiredSnapshotIds.add(snapshotId);
return this;
}
@Override
- public BaseExpireSnapshotsSparkAction expireOlderThan(long timestampMillis) {
+ public ExpireSnapshotsSparkAction expireOlderThan(long timestampMillis) {
this.expireOlderThanValue = timestampMillis;
return this;
}
@Override
- public BaseExpireSnapshotsSparkAction retainLast(int numSnapshots) {
+ public ExpireSnapshotsSparkAction retainLast(int numSnapshots) {
Preconditions.checkArgument(1 <= numSnapshots,
"Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots);
this.retainLastValue = numSnapshots;
@@ -131,7 +131,7 @@ public class BaseExpireSnapshotsSparkAction
}
@Override
- public BaseExpireSnapshotsSparkAction deleteWith(Consumer<String> newDeleteFunc) {
+ public ExpireSnapshotsSparkAction deleteWith(Consumer<String> newDeleteFunc) {
this.deleteFunc = newDeleteFunc;
return this;
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
similarity index 93%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
index ef9f0d3e25..7146bffcbe 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
@@ -50,18 +50,18 @@ import scala.collection.JavaConverters;
* previously referred to a non-Iceberg table will refer to the newly migrated Iceberg
* table.
*/
-public class BaseMigrateTableSparkAction
- extends BaseTableCreationSparkAction<MigrateTable, MigrateTable.Result>
+public class MigrateTableSparkAction
+ extends BaseTableCreationSparkAction<MigrateTableSparkAction>
implements MigrateTable {
- private static final Logger LOG = LoggerFactory.getLogger(BaseMigrateTableSparkAction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MigrateTableSparkAction.class);
private static final String BACKUP_SUFFIX = "_BACKUP_";
private final StagingTableCatalog destCatalog;
private final Identifier destTableIdent;
private final Identifier backupIdent;
- public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+ MigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
super(spark, sourceCatalog, sourceTableIdent);
this.destCatalog = checkDestinationCatalog(sourceCatalog);
this.destTableIdent = sourceTableIdent;
@@ -70,7 +70,7 @@ public class BaseMigrateTableSparkAction
}
@Override
- protected MigrateTable self() {
+ protected MigrateTableSparkAction self() {
return this;
}
@@ -85,13 +85,13 @@ public class BaseMigrateTableSparkAction
}
@Override
- public MigrateTable tableProperties(Map<String, String> properties) {
+ public MigrateTableSparkAction tableProperties(Map<String, String> properties) {
setProperties(properties);
return this;
}
@Override
- public MigrateTable tableProperty(String property, String value) {
+ public MigrateTableSparkAction tableProperty(String property, String value) {
setProperty(property, value);
return this;
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
similarity index 96%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index d1937fcbcd..bb2cbd8329 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -72,10 +72,11 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BaseRewriteDataFilesSparkAction
- extends BaseSnapshotUpdateSparkAction<RewriteDataFiles, RewriteDataFiles.Result> implements RewriteDataFiles {
+public class RewriteDataFilesSparkAction
+ extends BaseSnapshotUpdateSparkAction<RewriteDataFilesSparkAction>
+ implements RewriteDataFiles {
- private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesSparkAction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesSparkAction.class);
private static final Set<String> VALID_OPTIONS = ImmutableSet.of(
MAX_CONCURRENT_FILE_GROUP_REWRITES,
MAX_FILE_GROUP_SIZE_BYTES,
@@ -96,18 +97,18 @@ public class BaseRewriteDataFilesSparkAction
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;
- protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
+ RewriteDataFilesSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
}
@Override
- protected RewriteDataFiles self() {
+ protected RewriteDataFilesSparkAction self() {
return this;
}
@Override
- public RewriteDataFiles binPack() {
+ public RewriteDataFilesSparkAction binPack() {
Preconditions.checkArgument(this.strategy == null,
"Cannot set strategy to binpack, it has already been set", this.strategy);
this.strategy = binPackStrategy();
@@ -115,7 +116,7 @@ public class BaseRewriteDataFilesSparkAction
}
@Override
- public RewriteDataFiles sort(SortOrder sortOrder) {
+ public RewriteDataFilesSparkAction sort(SortOrder sortOrder) {
Preconditions.checkArgument(this.strategy == null,
"Cannot set strategy to sort, it has already been set to %s", this.strategy);
this.strategy = sortStrategy().sortOrder(sortOrder);
@@ -123,7 +124,7 @@ public class BaseRewriteDataFilesSparkAction
}
@Override
- public RewriteDataFiles sort() {
+ public RewriteDataFilesSparkAction sort() {
Preconditions.checkArgument(this.strategy == null,
"Cannot set strategy to sort, it has already been set to %s", this.strategy);
this.strategy = sortStrategy();
@@ -131,7 +132,7 @@ public class BaseRewriteDataFilesSparkAction
}
@Override
- public RewriteDataFiles zOrder(String... columnNames) {
+ public RewriteDataFilesSparkAction zOrder(String... columnNames) {
Preconditions.checkArgument(this.strategy == null,
"Cannot set strategy to zorder, it has already been set to %s", this.strategy);
this.strategy = zOrderStrategy(columnNames);
@@ -139,7 +140,7 @@ public class BaseRewriteDataFilesSparkAction
}
@Override
- public RewriteDataFiles filter(Expression expression) {
+ public RewriteDataFilesSparkAction filter(Expression expression) {
filter = Expressions.and(filter, expression);
return this;
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
similarity index 95%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 1648dd26f8..99e51a37aa 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -79,14 +79,14 @@ import static org.apache.iceberg.MetadataTableType.ENTRIES;
* and a custom spec id to {@link #specId(int)}. In addition, there is a way to configure a custom location
* for new manifests via {@link #stagingLocation}.
*/
-public class BaseRewriteManifestsSparkAction
- extends BaseSnapshotUpdateSparkAction<RewriteManifests, RewriteManifests.Result>
+public class RewriteManifestsSparkAction
+ extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction>
implements RewriteManifests {
- private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteManifestsSparkAction.class);
+ public static final String USE_CACHING = "use-caching";
+ public static final boolean USE_CACHING_DEFAULT = true;
- private static final String USE_CACHING = "use-caching";
- private static final boolean USE_CACHING_DEFAULT = true;
+ private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class);
private final Encoder<ManifestFile> manifestEncoder;
private final Table table;
@@ -98,7 +98,7 @@ public class BaseRewriteManifestsSparkAction
private Predicate<ManifestFile> predicate = manifest -> true;
private String stagingLocation = null;
- public BaseRewriteManifestsSparkAction(SparkSession spark, Table table) {
+ RewriteManifestsSparkAction(SparkSession spark, Table table) {
super(spark);
this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
this.table = table;
@@ -119,25 +119,25 @@ public class BaseRewriteManifestsSparkAction
}
@Override
- protected RewriteManifests self() {
+ protected RewriteManifestsSparkAction self() {
return this;
}
@Override
- public RewriteManifests specId(int specId) {
+ public RewriteManifestsSparkAction specId(int specId) {
Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %s", specId);
this.spec = table.specs().get(specId);
return this;
}
@Override
- public RewriteManifests rewriteIf(Predicate<ManifestFile> newPredicate) {
+ public RewriteManifestsSparkAction rewriteIf(Predicate<ManifestFile> newPredicate) {
this.predicate = newPredicate;
return this;
}
@Override
- public RewriteManifests stagingLocation(String newStagingLocation) {
+ public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) {
this.stagingLocation = newStagingLocation;
return this;
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
similarity index 92%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
index e584d33a3f..526b46af1a 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotTableSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
@@ -48,22 +48,22 @@ import scala.collection.JavaConverters;
* have a different data and metadata directory allowing it to exist independently of the
* source table.
*/
-public class BaseSnapshotTableSparkAction
- extends BaseTableCreationSparkAction<SnapshotTable, SnapshotTable.Result>
+public class SnapshotTableSparkAction
+ extends BaseTableCreationSparkAction<SnapshotTableSparkAction>
implements SnapshotTable {
- private static final Logger LOG = LoggerFactory.getLogger(BaseSnapshotTableSparkAction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotTableSparkAction.class);
private StagingTableCatalog destCatalog;
private Identifier destTableIdent;
private String destTableLocation = null;
- BaseSnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
+ SnapshotTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
super(spark, sourceCatalog, sourceTableIdent);
}
@Override
- protected SnapshotTable self() {
+ protected SnapshotTableSparkAction self() {
return this;
}
@@ -78,7 +78,7 @@ public class BaseSnapshotTableSparkAction
}
@Override
- public SnapshotTable as(String ident) {
+ public SnapshotTableSparkAction as(String ident) {
String ctx = "snapshot destination";
CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog();
CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), ident, defaultCatalog);
@@ -88,13 +88,13 @@ public class BaseSnapshotTableSparkAction
}
@Override
- public SnapshotTable tableProperties(Map<String, String> properties) {
+ public SnapshotTableSparkAction tableProperties(Map<String, String> properties) {
setProperties(properties);
return this;
}
@Override
- public SnapshotTable tableProperty(String property, String value) {
+ public SnapshotTableSparkAction tableProperty(String property, String value) {
setProperty(property, value);
return this;
}
@@ -194,7 +194,7 @@ public class BaseSnapshotTableSparkAction
}
@Override
- public SnapshotTable tableLocation(String location) {
+ public SnapshotTableSparkAction tableLocation(String location) {
Preconditions.checkArgument(!sourceTableLocation().equals(location),
"The snapshot table location cannot be same as the source table location. " +
"This would mix snapshot table files with original table files.");
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index d77a13d128..7131f869f3 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -21,13 +21,6 @@ package org.apache.iceberg.spark.actions;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ActionsProvider;
-import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.actions.DeleteReachableFiles;
-import org.apache.iceberg.actions.ExpireSnapshots;
-import org.apache.iceberg.actions.MigrateTable;
-import org.apache.iceberg.actions.RewriteDataFiles;
-import org.apache.iceberg.actions.RewriteManifests;
-import org.apache.iceberg.actions.SnapshotTable;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
import org.apache.spark.sql.SparkSession;
@@ -56,7 +49,7 @@ public class SparkActions implements ActionsProvider {
}
@Override
- public SnapshotTable snapshotTable(String tableIdent) {
+ public SnapshotTableSparkAction snapshotTable(String tableIdent) {
String ctx = "snapshot source";
CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog();
CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark, tableIdent, defaultCatalog);
@@ -64,7 +57,7 @@ public class SparkActions implements ActionsProvider {
}
@Override
- public MigrateTable migrateTable(String tableIdent) {
+ public MigrateTableSparkAction migrateTable(String tableIdent) {
String ctx = "migrate target";
CatalogPlugin defaultCatalog = spark.sessionState().catalogManager().currentCatalog();
CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark, tableIdent, defaultCatalog);
@@ -72,27 +65,27 @@ public class SparkActions implements ActionsProvider {
}
@Override
- public RewriteDataFiles rewriteDataFiles(Table table) {
+ public RewriteDataFilesSparkAction rewriteDataFiles(Table table) {
return new BaseRewriteDataFilesSparkAction(spark, table);
}
@Override
- public DeleteOrphanFiles deleteOrphanFiles(Table table) {
+ public DeleteOrphanFilesSparkAction deleteOrphanFiles(Table table) {
return new BaseDeleteOrphanFilesSparkAction(spark, table);
}
@Override
- public RewriteManifests rewriteManifests(Table table) {
+ public RewriteManifestsSparkAction rewriteManifests(Table table) {
return new BaseRewriteManifestsSparkAction(spark, table);
}
@Override
- public ExpireSnapshots expireSnapshots(Table table) {
+ public ExpireSnapshotsSparkAction expireSnapshots(Table table) {
return new BaseExpireSnapshotsSparkAction(spark, table);
}
@Override
- public DeleteReachableFiles deleteReachableFiles(String metadataLocation) {
+ public DeleteReachableFilesSparkAction deleteReachableFiles(String metadataLocation) {
return new BaseDeleteReachableFilesSparkAction(spark, metadataLocation);
}
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 042fa6e4bb..272cacc4d4 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -22,7 +22,7 @@ package org.apache.iceberg.spark.procedures;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.spark.actions.BaseExpireSnapshotsSparkAction;
+import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.DateTimeUtil;
@@ -108,7 +108,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
}
if (streamResult != null) {
- action.option(BaseExpireSnapshotsSparkAction.STREAM_RESULTS, Boolean.toString(streamResult));
+ action.option(ExpireSnapshotsSparkAction.STREAM_RESULTS, Boolean.toString(streamResult));
}
ExpireSnapshots.Result result = action.execute();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index 23d6471908..a64a28b631 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.spark.actions.BaseDeleteOrphanFilesSparkAction;
+import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.DateTimeUtil;
@@ -95,7 +95,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
"max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes);
return withIcebergTable(tableIdent, table -> {
- DeleteOrphanFiles action = actions().deleteOrphanFiles(table);
+ DeleteOrphanFilesSparkAction action = actions().deleteOrphanFiles(table);
if (olderThanMillis != null) {
boolean isTesting = Boolean.parseBoolean(spark().conf().get("spark.testing", "false"));
@@ -118,7 +118,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
}
if (fileListView != null) {
- ((BaseDeleteOrphanFilesSparkAction) action).compareToFileList(spark().table(fileListView));
+ action.compareToFileList(spark().table(fileListView));
}
DeleteOrphanFiles.Result result = action.execute();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
index eae7ce0fca..1cc76501c0 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.spark.procedures;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.actions.RewriteManifestsSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -82,10 +83,10 @@ class RewriteManifestsProcedure extends BaseProcedure {
Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
return modifyIcebergTable(tableIdent, table -> {
- RewriteManifests action = actions().rewriteManifests(table);
+ RewriteManifestsSparkAction action = actions().rewriteManifests(table);
if (useCaching != null) {
- action.option("use-caching", useCaching.toString());
+ action.option(RewriteManifestsSparkAction.USE_CACHING, useCaching.toString());
}
RewriteManifests.Result result = action.execute();
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index 2c324a88f1..232a85f984 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -1096,7 +1096,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
Set<String> deletedFiles = Sets.newHashSet();
- BaseExpireSnapshotsSparkAction action = (BaseExpireSnapshotsSparkAction) SparkActions.get().expireSnapshots(table)
+ ExpireSnapshotsSparkAction action = SparkActions.get().expireSnapshots(table)
.expireOlderThan(tAfterCommits)
.deleteWith(deletedFiles::add);
Dataset<Row> pendingDeletes = action.expire();
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index a56e888207..1d9e479397 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -892,7 +892,7 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
.withColumnRenamed("lastModified", "last_modified");
DeleteOrphanFiles.Result result1 =
- ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+ actions.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.deleteWith(s -> { })
.execute();
@@ -901,7 +901,7 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
Iterables.isEmpty(result1.orphanFileLocations()));
DeleteOrphanFiles.Result result2 =
- ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+ actions.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.deleteWith(s -> { })
@@ -912,7 +912,7 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
"Invalid file should be present", fs.exists(new Path(invalidFilePaths.get(0))));
DeleteOrphanFiles.Result result3 =
- ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+ actions.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.execute();
@@ -940,7 +940,7 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
.withColumnRenamed("lastModified", "last_modified");
DeleteOrphanFiles.Result result4 =
- ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
+ actions.deleteOrphanFiles(table)
.compareToFileList(compareToFileListWithOutsideLocation)
.deleteWith(s -> { })
.execute();
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index e0b1e53cfe..db08619597 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -79,7 +79,7 @@ import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
-import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext;
+import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.RewriteExecutionContext;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;