You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/05/19 23:09:45 UTC
[iceberg] branch master updated: Spark 3.4: Harmonize RewriteDataFilesSparkAction code with RewritePositionDeleteFilesSparkAction (#7630)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8ace681824 Spark 3.4: Harmonize RewriteDataFilesSparkAction code with RewritePositionDeleteFilesSparkAction (#7630)
8ace681824 is described below
commit 8ace681824c1c11ac06c3e8d2f86be8eb2b75600
Author: Ajantha Bhat <aj...@gmail.com>
AuthorDate: Sat May 20 04:39:39 2023 +0530
Spark 3.4: Harmonize RewriteDataFilesSparkAction code with RewritePositionDeleteFilesSparkAction (#7630)
---
.../apache/iceberg/actions/RewriteFileGroup.java | 17 +++
.../spark/actions/RewriteDataFilesSparkAction.java | 163 +++++++++------------
.../spark/actions/TestRewriteDataFilesAction.java | 3 +-
3 files changed, 91 insertions(+), 92 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
index bb62f79711..f816b5d7a4 100644
--- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
@@ -19,11 +19,13 @@
package org.apache.iceberg.actions;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -92,4 +94,19 @@ public class RewriteFileGroup {
public int numFiles() {
return fileScanTasks.size();
}
+
+ public static Comparator<RewriteFileGroup> comparator(RewriteJobOrder rewriteJobOrder) {
+ switch (rewriteJobOrder) {
+ case BYTES_ASC:
+ return Comparator.comparing(RewriteFileGroup::sizeInBytes);
+ case BYTES_DESC:
+ return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
+ case FILES_ASC:
+ return Comparator.comparing(RewriteFileGroup::numFiles);
+ case FILES_DESC:
+ return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
+ default:
+ return (fileGroupOne, fileGroupTwo) -> 0;
+ }
+ }
}
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 658d3a9279..6b5628a1f4 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -84,6 +83,9 @@ public class RewriteDataFilesSparkAction
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER);
+ private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
+ ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
+
private final Table table;
private Expression filter = Expressions.alwaysTrue();
@@ -147,7 +149,7 @@ public class RewriteDataFilesSparkAction
@Override
public RewriteDataFiles.Result execute() {
if (table.currentSnapshot() == null) {
- return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
+ return EMPTY_RESULT;
}
long startingSnapshotId = table.currentSnapshot().snapshotId();
@@ -159,26 +161,25 @@ public class RewriteDataFilesSparkAction
validateAndInitOptions();
- Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
+ StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
planFileGroups(startingSnapshotId);
RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
if (ctx.totalGroupCount() == 0) {
LOG.info("Nothing found to rewrite in {}", table.name());
- return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
+ return EMPTY_RESULT;
}
Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);
- RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId);
if (partialProgressEnabled) {
- return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
+ return doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId));
} else {
- return doExecute(ctx, groupStream, commitManager);
+ return doExecute(ctx, groupStream, commitManager(startingSnapshotId));
}
}
- Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
+ StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
CloseableIterable<FileScanTask> fileScanTasks =
table
.newScan()
@@ -189,43 +190,9 @@ public class RewriteDataFilesSparkAction
try {
StructType partitionType = table.spec().partitionType();
- StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
- StructLike emptyStruct = GenericRecord.create(partitionType);
-
- fileScanTasks.forEach(
- task -> {
- // If a task uses an incompatible partition spec the data inside could contain values
- // which
- // belong to multiple partitions in the current spec. Treating all such files as
- // un-partitioned and
- // grouping them together helps to minimize new files made.
- StructLike taskPartition =
- task.file().specId() == table.spec().specId()
- ? task.file().partition()
- : emptyStruct;
-
- List<FileScanTask> files = filesByPartition.get(taskPartition);
- if (files == null) {
- files = Lists.newArrayList();
- }
-
- files.add(task);
- filesByPartition.put(taskPartition, files);
- });
-
- StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
- StructLikeMap.create(partitionType);
-
- filesByPartition.forEach(
- (partition, tasks) -> {
- Iterable<List<FileScanTask>> plannedFileGroups = rewriter.planFileGroups(tasks);
- List<List<FileScanTask>> fileGroups = ImmutableList.copyOf(plannedFileGroups);
- if (fileGroups.size() > 0) {
- fileGroupsByPartition.put(partition, fileGroups);
- }
- });
-
- return fileGroupsByPartition;
+ StructLikeMap<List<FileScanTask>> filesByPartition =
+ groupByPartition(partitionType, fileScanTasks);
+ return fileGroupsByPartition(filesByPartition);
} finally {
try {
fileScanTasks.close();
@@ -235,6 +202,38 @@ public class RewriteDataFilesSparkAction
}
}
+ private StructLikeMap<List<FileScanTask>> groupByPartition(
+ StructType partitionType, Iterable<FileScanTask> tasks) {
+ StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
+ StructLike emptyStruct = GenericRecord.create(partitionType);
+
+ for (FileScanTask task : tasks) {
+ // If a task uses an incompatible partition spec the data inside could contain values
+ // which belong to multiple partitions in the current spec. Treating all such files as
+ // un-partitioned and grouping them together helps to minimize new files made.
+ StructLike taskPartition =
+ task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct;
+
+ List<FileScanTask> files = filesByPartition.get(taskPartition);
+ if (files == null) {
+ files = Lists.newArrayList();
+ }
+
+ files.add(task);
+ filesByPartition.put(taskPartition, files);
+ }
+ return filesByPartition;
+ }
+
+ private StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition(
+ StructLikeMap<List<FileScanTask>> filesByPartition) {
+ return filesByPartition.transformValues(this::planFileGroups);
+ }
+
+ private List<List<FileScanTask>> planFileGroups(List<FileScanTask> tasks) {
+ return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
+ }
+
@VisibleForTesting
RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) {
String desc = jobDesc(fileGroup, ctx);
@@ -300,7 +299,7 @@ public class RewriteDataFilesSparkAction
Tasks.foreach(rewrittenGroups)
.suppressFailureWhenFinished()
- .run(group -> commitManager.abortFileGroup(group));
+ .run(commitManager::abortFileGroup);
throw e;
} finally {
rewriteService.shutdown();
@@ -332,14 +331,14 @@ public class RewriteDataFilesSparkAction
RewriteDataFilesCommitManager commitManager) {
ExecutorService rewriteService = rewriteService();
- // Start Commit Service
+ // start commit service
int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING);
RewriteDataFilesCommitManager.CommitService commitService =
commitManager.service(groupsPerCommit);
commitService.start();
Collection<FileGroupFailureResult> rewriteFailures = new ConcurrentLinkedQueue<>();
- // Start rewrite tasks
+ // start rewrite tasks
Tasks.foreach(groupStream)
.suppressFailureWhenFinished()
.executeWith(rewriteService)
@@ -356,7 +355,7 @@ public class RewriteDataFilesSparkAction
.run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
rewriteService.shutdown();
- // Stop Commit service
+ // stop commit service
commitService.close();
List<RewriteFileGroup> commitResults = commitService.results();
if (commitResults.size() == 0) {
@@ -377,45 +376,29 @@ public class RewriteDataFilesSparkAction
}
Stream<RewriteFileGroup> toGroupStream(
- RewriteExecutionContext ctx,
- Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
- Stream<RewriteFileGroup> rewriteFileGroupStream =
- fileGroupsByPartition.entrySet().stream()
- .flatMap(
- e -> {
- StructLike partition = e.getKey();
- List<List<FileScanTask>> fileGroups = e.getValue();
- return fileGroups.stream()
- .map(
- tasks -> {
- int globalIndex = ctx.currentGlobalIndex();
- int partitionIndex = ctx.currentPartitionIndex(partition);
- FileGroupInfo info =
- ImmutableRewriteDataFiles.FileGroupInfo.builder()
- .globalIndex(globalIndex)
- .partitionIndex(partitionIndex)
- .partition(partition)
- .build();
- return new RewriteFileGroup(info, tasks);
- });
- });
-
- return rewriteFileGroupStream.sorted(rewriteGroupComparator());
+ RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) {
+ return groupsByPartition.entrySet().stream()
+ .filter(e -> e.getValue().size() != 0)
+ .flatMap(
+ e -> {
+ StructLike partition = e.getKey();
+ List<List<FileScanTask>> scanGroups = e.getValue();
+ return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks));
+ })
+ .sorted(RewriteFileGroup.comparator(rewriteJobOrder));
}
- private Comparator<RewriteFileGroup> rewriteGroupComparator() {
- switch (rewriteJobOrder) {
- case BYTES_ASC:
- return Comparator.comparing(RewriteFileGroup::sizeInBytes);
- case BYTES_DESC:
- return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
- case FILES_ASC:
- return Comparator.comparing(RewriteFileGroup::numFiles);
- case FILES_DESC:
- return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
- default:
- return (fileGroupOne, fileGroupTwo) -> 0;
- }
+ private RewriteFileGroup newRewriteGroup(
+ RewriteExecutionContext ctx, StructLike partition, List<FileScanTask> tasks) {
+ int globalIndex = ctx.currentGlobalIndex();
+ int partitionIndex = ctx.currentPartitionIndex(partition);
+ FileGroupInfo info =
+ ImmutableRewriteDataFiles.FileGroupInfo.builder()
+ .globalIndex(globalIndex)
+ .partitionIndex(partitionIndex)
+ .partition(partition)
+ .build();
+ return new RewriteFileGroup(info, tasks);
}
void validateAndInitOptions() {
@@ -495,15 +478,13 @@ public class RewriteDataFilesSparkAction
@VisibleForTesting
static class RewriteExecutionContext {
- private final Map<StructLike, Integer> numGroupsByPartition;
+ private final StructLikeMap<Integer> numGroupsByPartition;
private final int totalGroupCount;
private final Map<StructLike, Integer> partitionIndexMap;
private final AtomicInteger groupIndex;
- RewriteExecutionContext(Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
- this.numGroupsByPartition =
- fileGroupsByPartition.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
+ RewriteExecutionContext(StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition) {
+ this.numGroupsByPartition = fileGroupsByPartition.transformValues(List::size);
this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0);
this.partitionIndexMap = Maps.newConcurrentMap();
this.groupIndex = new AtomicInteger(1);
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 76b8f58d9a..bf4bef74c3 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -100,6 +100,7 @@ import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.internal.SQLConf;
@@ -1390,7 +1391,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
private Stream<RewriteFileGroup> toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
- Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
+ StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
rewrite.planFileGroups(table.currentSnapshot().snapshotId());
return rewrite.toGroupStream(