You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by et...@apache.org on 2023/05/16 05:27:31 UTC
[iceberg] branch master updated: API, Core, Spark: Add file groups failure in rewrite result (#7361)
This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 42cf1579ab API, Core, Spark: Add file groups failure in rewrite result (#7361)
42cf1579ab is described below
commit 42cf1579ab97ff0080630c752e877593882a4fac
Author: waltczhang <35...@users.noreply.github.com>
AuthorDate: Tue May 16 13:27:26 2023 +0800
API, Core, Spark: Add file groups failure in rewrite result (#7361)
---
.../apache/iceberg/actions/RewriteDataFiles.java | 19 ++++++++++++++++
.../extensions/TestRewriteDataFilesProcedure.java | 26 +++++++++++-----------
.../spark/actions/RewriteDataFilesSparkAction.java | 17 +++++++++++---
.../procedures/RewriteDataFilesProcedure.java | 12 ++++++++--
.../spark/actions/TestRewriteDataFilesAction.java | 8 +++++--
5 files changed, 62 insertions(+), 20 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index c73ea300c7..20e75159b1 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -23,6 +23,7 @@ import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.immutables.value.Value;
/**
@@ -181,6 +182,11 @@ public interface RewriteDataFiles
interface Result {
List<FileGroupRewriteResult> rewriteResults();
+ @Value.Default
+ default List<FileGroupFailureResult> rewriteFailures() {
+ return ImmutableList.of();
+ }
+
@Value.Default
default int addedDataFilesCount() {
return rewriteResults().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum();
@@ -197,6 +203,11 @@ public interface RewriteDataFiles
default long rewrittenBytesCount() {
return rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum();
}
+
+ @Value.Default
+ default int failedDataFilesCount() {
+ return rewriteFailures().stream().mapToInt(FileGroupFailureResult::dataFilesCount).sum();
+ }
}
/**
@@ -217,6 +228,14 @@ public interface RewriteDataFiles
}
}
+ /** For a file group that failed to rewrite. */
+ @Value.Immutable
+ interface FileGroupFailureResult {
+ FileGroupInfo info();
+
+ int dataFilesCount();
+ }
+
/**
* A description of a file group, when it was processed, and within which partition. For use
* tracking rewrite operations and for returning results.
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 6cda93f867..3c07e676a5 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -83,7 +83,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
public void testRewriteDataFilesInEmptyTable() {
createTable();
List<Object[]> output = sql("CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
- assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L)), output);
+ assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L, 0)), output);
}
@Test
@@ -101,7 +101,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(10, 2),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -125,7 +125,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -149,7 +149,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
assertEquals(
"Action should rewrite 0 data files and add 0 data files",
- ImmutableList.of(row(0, 0, 0L)),
+ ImmutableList.of(row(0, 0, 0L, 0)),
output);
List<Object[]> actualRecords = currentData();
@@ -175,7 +175,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -202,7 +202,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -244,7 +244,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -272,7 +272,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -300,7 +300,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -540,7 +540,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isEqualTo(
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -579,7 +579,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(
@@ -619,7 +619,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(
@@ -655,7 +655,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
row(2, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
- assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 5f95ef3ed4..658d3a9279 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.actions;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -337,14 +338,21 @@ public class RewriteDataFilesSparkAction
commitManager.service(groupsPerCommit);
commitService.start();
+ Collection<FileGroupFailureResult> rewriteFailures = new ConcurrentLinkedQueue<>();
// Start rewrite tasks
Tasks.foreach(groupStream)
.suppressFailureWhenFinished()
.executeWith(rewriteService)
.noRetry()
.onFailure(
- (fileGroup, exception) ->
- LOG.error("Failure during rewrite group {}", fileGroup.info(), exception))
+ (fileGroup, exception) -> {
+ LOG.error("Failure during rewrite group {}", fileGroup.info(), exception);
+ rewriteFailures.add(
+ ImmutableRewriteDataFiles.FileGroupFailureResult.builder()
+ .info(fileGroup.info())
+ .dataFilesCount(fileGroup.numFiles())
+ .build());
+ })
.run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
rewriteService.shutdown();
@@ -362,7 +370,10 @@ public class RewriteDataFilesSparkAction
List<FileGroupRewriteResult> rewriteResults =
commitResults.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
- return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build();
+ return ImmutableRewriteDataFiles.Result.builder()
+ .rewriteResults(rewriteResults)
+ .rewriteFailures(rewriteFailures)
+ .build();
}
Stream<RewriteFileGroup> toGroupStream(
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 1aea61e747..3929e346c3 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -68,7 +68,9 @@ class RewriteDataFilesProcedure extends BaseProcedure {
"rewritten_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"added_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
- new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty())
+ new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+ new StructField(
+ "failed_data_files_count", DataTypes.IntegerType, false, Metadata.empty())
});
public static ProcedureBuilder builder() {
@@ -216,8 +218,14 @@ class RewriteDataFilesProcedure extends BaseProcedure {
int rewrittenDataFilesCount = result.rewrittenDataFilesCount();
long rewrittenBytesCount = result.rewrittenBytesCount();
int addedDataFilesCount = result.addedDataFilesCount();
+ int failedDataFilesCount = result.failedDataFilesCount();
+
InternalRow row =
- newInternalRow(rewrittenDataFilesCount, addedDataFilesCount, rewrittenBytesCount);
+ newInternalRow(
+ rewrittenDataFilesCount,
+ addedDataFilesCount,
+ rewrittenBytesCount,
+ failedDataFilesCount);
return new InternalRow[] {row};
}
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index a638776033..76b8f58d9a 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -755,7 +755,9 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
RewriteDataFiles.Result result = spyRewrite.execute();
- Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7);
+ assertThat(result.rewriteResults()).hasSize(7);
+ assertThat(result.rewriteFailures()).hasSize(3);
+ assertThat(result.failedDataFilesCount()).isEqualTo(6);
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
table.refresh();
@@ -796,7 +798,9 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
RewriteDataFiles.Result result = spyRewrite.execute();
- Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7);
+ assertThat(result.rewriteResults()).hasSize(7);
+ assertThat(result.rewriteFailures()).hasSize(3);
+ assertThat(result.failedDataFilesCount()).isEqualTo(6);
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
table.refresh();