You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/05/16 23:17:12 UTC
[iceberg] branch master updated: Spark 3.4: Fixup for RewritePositionDeleteFilesSparkAction (#7389) (#7565)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 890ca0ef73 Spark 3.4: Fixup for RewritePositionDeleteFilesSparkAction (#7389) (#7565)
890ca0ef73 is described below
commit 890ca0ef731cb38706b5deadece4d34bd9806e18
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Tue May 16 16:17:03 2023 -0700
Spark 3.4: Fixup for RewritePositionDeleteFilesSparkAction (#7389) (#7565)
---
.../org/apache/iceberg/util/StructLikeMap.java | 7 +
... => RewritePositionDeleteFilesSparkAction.java} | 166 +++++++++++----------
.../apache/iceberg/spark/actions/SparkActions.java | 4 +-
3 files changed, 93 insertions(+), 84 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
index 6af658d130..58bd030413 100644
--- a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
+++ b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.function.Function;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -171,4 +172,10 @@ public class StructLikeMap<T> extends AbstractMap<StructLike, T> implements Map<
throw new UnsupportedOperationException("Does not support setValue.");
}
}
+
+ public <U> StructLikeMap<U> transformValues(Function<T, U> func) {
+ StructLikeMap<U> result = create(type);
+ wrapperMap.forEach((key, value) -> result.put(key.get(), func.apply(value)));
+ return result;
+ }
}
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
similarity index 79%
rename from spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteSparkAction.java
rename to spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index 87d945e7ae..bd8610ea73 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteSparkAction.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -47,7 +47,6 @@ import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -68,11 +67,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Spark implementation of {@link RewritePositionDeleteFiles}. */
-public class RewritePositionDeleteSparkAction
- extends BaseSnapshotUpdateSparkAction<RewritePositionDeleteSparkAction>
+public class RewritePositionDeleteFilesSparkAction
+ extends BaseSnapshotUpdateSparkAction<RewritePositionDeleteFilesSparkAction>
implements RewritePositionDeleteFiles {
- private static final Logger LOG = LoggerFactory.getLogger(RewritePositionDeleteSparkAction.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RewritePositionDeleteFilesSparkAction.class);
private static final Set<String> VALID_OPTIONS =
ImmutableSet.of(
MAX_CONCURRENT_FILE_GROUP_REWRITES,
@@ -90,19 +90,19 @@ public class RewritePositionDeleteSparkAction
private boolean partialProgressEnabled;
private RewriteJobOrder rewriteJobOrder;
- RewritePositionDeleteSparkAction(SparkSession spark, Table table) {
+ RewritePositionDeleteFilesSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
this.rewriter = new SparkBinPackPositionDeletesRewriter(spark(), table);
}
@Override
- protected RewritePositionDeleteSparkAction self() {
+ protected RewritePositionDeleteFilesSparkAction self() {
return this;
}
@Override
- public RewritePositionDeleteSparkAction filter(Expression expression) {
+ public RewritePositionDeleteFilesSparkAction filter(Expression expression) {
throw new UnsupportedOperationException("Regular filters not supported yet.");
}
@@ -115,7 +115,7 @@ public class RewritePositionDeleteSparkAction
validateAndInitOptions();
- Map<StructLike, List<List<PositionDeletesScanTask>>> fileGroupsByPartition = planFileGroups();
+ StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition = planFileGroups();
RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
if (ctx.totalGroupCount() == 0) {
@@ -125,63 +125,68 @@ public class RewritePositionDeleteSparkAction
Stream<RewritePositionDeletesGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);
- RewritePositionDeletesCommitManager commitManager = commitManager();
if (partialProgressEnabled) {
- return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
+ return doExecuteWithPartialProgress(ctx, groupStream, commitManager());
} else {
- return doExecute(ctx, groupStream, commitManager);
+ return doExecute(ctx, groupStream, commitManager());
}
}
- private Map<StructLike, List<List<PositionDeletesScanTask>>> planFileGroups() {
- Table deletesTable =
- MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
- CloseableIterable<PositionDeletesScanTask> scanTasks =
- CloseableIterable.transform(
- deletesTable.newBatchScan().ignoreResiduals().planFiles(),
- t -> (PositionDeletesScanTask) t);
+ private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
+ CloseableIterable<PositionDeletesScanTask> fileTasks = planFiles();
try {
StructType partitionType = Partitioning.partitionType(table);
- StructLikeMap<List<PositionDeletesScanTask>> filesByPartition =
- StructLikeMap.create(partitionType);
-
- for (PositionDeletesScanTask task : scanTasks) {
- StructLike coerced = coercePartition(task, partitionType);
-
- List<PositionDeletesScanTask> partitionTasks = filesByPartition.get(coerced);
- if (partitionTasks == null) {
- partitionTasks = Lists.newArrayList();
- }
- partitionTasks.add(task);
- filesByPartition.put(coerced, partitionTasks);
- }
-
- StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition =
- StructLikeMap.create(partitionType);
-
- filesByPartition.forEach(
- (partition, partitionTasks) -> {
- Iterable<List<PositionDeletesScanTask>> plannedFileGroups =
- rewriter.planFileGroups(partitionTasks);
- List<List<PositionDeletesScanTask>> groups = ImmutableList.copyOf(plannedFileGroups);
- if (groups.size() > 0) {
- fileGroupsByPartition.put(partition, groups);
- }
- });
-
- return fileGroupsByPartition;
+ StructLikeMap<List<PositionDeletesScanTask>> fileTasksByPartition =
+ groupByPartition(partitionType, fileTasks);
+ return fileGroupsByPartition(fileTasksByPartition);
} finally {
try {
- scanTasks.close();
+ fileTasks.close();
} catch (IOException io) {
LOG.error("Cannot properly close file iterable while planning for rewrite", io);
}
}
}
- @VisibleForTesting
- RewritePositionDeletesGroup rewriteDeleteFiles(
+ private CloseableIterable<PositionDeletesScanTask> planFiles() {
+ Table deletesTable =
+ MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
+
+ return CloseableIterable.transform(
+ deletesTable.newBatchScan().ignoreResiduals().planFiles(),
+ task -> (PositionDeletesScanTask) task);
+ }
+
+ private StructLikeMap<List<PositionDeletesScanTask>> groupByPartition(
+ StructType partitionType, Iterable<PositionDeletesScanTask> tasks) {
+ StructLikeMap<List<PositionDeletesScanTask>> filesByPartition =
+ StructLikeMap.create(partitionType);
+
+ for (PositionDeletesScanTask task : tasks) {
+ StructLike coerced = coercePartition(task, partitionType);
+
+ List<PositionDeletesScanTask> partitionTasks = filesByPartition.get(coerced);
+ if (partitionTasks == null) {
+ partitionTasks = Lists.newArrayList();
+ }
+ partitionTasks.add(task);
+ filesByPartition.put(coerced, partitionTasks);
+ }
+
+ return filesByPartition;
+ }
+
+ private StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition(
+ StructLikeMap<List<PositionDeletesScanTask>> filesByPartition) {
+ return filesByPartition.transformValues(this::planFileGroups);
+ }
+
+ private List<List<PositionDeletesScanTask>> planFileGroups(List<PositionDeletesScanTask> tasks) {
+ return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
+ }
+
+ private RewritePositionDeletesGroup rewriteDeleteFiles(
RewriteExecutionContext ctx, RewritePositionDeletesGroup fileGroup) {
String desc = jobDesc(fileGroup, ctx);
Set<DeleteFile> addedFiles =
@@ -204,8 +209,7 @@ public class RewritePositionDeleteSparkAction
.build()));
}
- @VisibleForTesting
- RewritePositionDeletesCommitManager commitManager() {
+ private RewritePositionDeletesCommitManager commitManager() {
return new RewritePositionDeletesCommitManager(table);
}
@@ -282,12 +286,12 @@ public class RewritePositionDeleteSparkAction
RewritePositionDeletesCommitManager commitManager) {
ExecutorService rewriteService = rewriteService();
- // Start Commit Service
+ // start commit service
int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING);
CommitService commitService = commitManager.service(groupsPerCommit);
commitService.start();
- // Start rewrite tasks
+ // start rewrite tasks
Tasks.foreach(groupStream)
.suppressFailureWhenFinished()
.executeWith(rewriteService)
@@ -298,7 +302,7 @@ public class RewritePositionDeleteSparkAction
.run(fileGroup -> commitService.offer(rewriteDeleteFiles(ctx, fileGroup)));
rewriteService.shutdown();
- // Stop Commit service
+ // stop commit service
commitService.close();
List<RewritePositionDeletesGroup> commitResults = commitService.results();
if (commitResults.size() == 0) {
@@ -319,31 +323,31 @@ public class RewritePositionDeleteSparkAction
.build();
}
- Stream<RewritePositionDeletesGroup> toGroupStream(
+ private Stream<RewritePositionDeletesGroup> toGroupStream(
RewriteExecutionContext ctx,
Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) {
- Stream<RewritePositionDeletesGroup> rewriteFileGroupStream =
- groupsByPartition.entrySet().stream()
- .flatMap(
- e -> {
- StructLike partition = e.getKey();
- List<List<PositionDeletesScanTask>> scanGroups = e.getValue();
- return scanGroups.stream()
- .map(
- tasks -> {
- int globalIndex = ctx.currentGlobalIndex();
- int partitionIndex = ctx.currentPartitionIndex(partition);
- FileGroupInfo info =
- ImmutableRewritePositionDeleteFiles.FileGroupInfo.builder()
- .globalIndex(globalIndex)
- .partitionIndex(partitionIndex)
- .partition(partition)
- .build();
- return new RewritePositionDeletesGroup(info, tasks);
- });
- });
-
- return rewriteFileGroupStream.sorted(RewritePositionDeletesGroup.comparator(rewriteJobOrder));
+ return groupsByPartition.entrySet().stream()
+ .filter(e -> e.getValue().size() != 0)
+ .flatMap(
+ e -> {
+ StructLike partition = e.getKey();
+ List<List<PositionDeletesScanTask>> scanGroups = e.getValue();
+ return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks));
+ })
+ .sorted(RewritePositionDeletesGroup.comparator(rewriteJobOrder));
+ }
+
+ private RewritePositionDeletesGroup newRewriteGroup(
+ RewriteExecutionContext ctx, StructLike partition, List<PositionDeletesScanTask> tasks) {
+ int globalIndex = ctx.currentGlobalIndex();
+ int partitionIndex = ctx.currentPartitionIndex(partition);
+ FileGroupInfo info =
+ ImmutableRewritePositionDeleteFiles.FileGroupInfo.builder()
+ .globalIndex(globalIndex)
+ .partitionIndex(partitionIndex)
+ .partition(partition)
+ .build();
+ return new RewritePositionDeletesGroup(info, tasks);
}
private void validateAndInitOptions() {
@@ -418,16 +422,14 @@ public class RewritePositionDeleteSparkAction
}
static class RewriteExecutionContext {
- private final Map<StructLike, Integer> numGroupsByPartition;
+ private final StructLikeMap<Integer> numGroupsByPartition;
private final int totalGroupCount;
private final Map<StructLike, Integer> partitionIndexMap;
private final AtomicInteger groupIndex;
RewriteExecutionContext(
- Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) {
- this.numGroupsByPartition =
- groupsByPartition.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
+ StructLikeMap<List<List<PositionDeletesScanTask>>> fileTasksByPartition) {
+ this.numGroupsByPartition = fileTasksByPartition.transformValues(List::size);
this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0);
this.partitionIndexMap = Maps.newConcurrentMap();
this.groupIndex = new AtomicInteger(1);
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index 64f1d91a18..fb67ded96e 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -93,7 +93,7 @@ public class SparkActions implements ActionsProvider {
}
@Override
- public RewritePositionDeleteSparkAction rewritePositionDeletes(Table table) {
- return new RewritePositionDeleteSparkAction(spark, table);
+ public RewritePositionDeleteFilesSparkAction rewritePositionDeletes(Table table) {
+ return new RewritePositionDeleteFilesSparkAction(spark, table);
}
}