You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by et...@apache.org on 2023/05/16 05:27:31 UTC

[iceberg] branch master updated: API, Core, Spark: Add file groups failure in rewrite result (#7361)

This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 42cf1579ab API, Core, Spark: Add file groups failure in rewrite result (#7361)
42cf1579ab is described below

commit 42cf1579ab97ff0080630c752e877593882a4fac
Author: waltczhang <35...@users.noreply.github.com>
AuthorDate: Tue May 16 13:27:26 2023 +0800

    API, Core, Spark: Add file groups failure in rewrite result (#7361)
---
 .../apache/iceberg/actions/RewriteDataFiles.java   | 19 ++++++++++++++++
 .../extensions/TestRewriteDataFilesProcedure.java  | 26 +++++++++++-----------
 .../spark/actions/RewriteDataFilesSparkAction.java | 17 +++++++++++---
 .../procedures/RewriteDataFilesProcedure.java      | 12 ++++++++--
 .../spark/actions/TestRewriteDataFilesAction.java  |  8 +++++--
 5 files changed, 62 insertions(+), 20 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index c73ea300c7..20e75159b1 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -23,6 +23,7 @@ import org.apache.iceberg.RewriteJobOrder;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.immutables.value.Value;
 
 /**
@@ -181,6 +182,11 @@ public interface RewriteDataFiles
   interface Result {
     List<FileGroupRewriteResult> rewriteResults();
 
+    @Value.Default
+    default List<FileGroupFailureResult> rewriteFailures() {
+      return ImmutableList.of();
+    }
+
     @Value.Default
     default int addedDataFilesCount() {
       return rewriteResults().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum();
@@ -197,6 +203,11 @@ public interface RewriteDataFiles
     default long rewrittenBytesCount() {
       return rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum();
     }
+
+    @Value.Default
+    default int failedDataFilesCount() {
+      return rewriteFailures().stream().mapToInt(FileGroupFailureResult::dataFilesCount).sum();
+    }
   }
 
   /**
@@ -217,6 +228,14 @@ public interface RewriteDataFiles
     }
   }
 
+  /** For a file group that failed to rewrite. */
+  @Value.Immutable
+  interface FileGroupFailureResult {
+    FileGroupInfo info();
+
+    int dataFilesCount();
+  }
+
   /**
    * A description of a file group, when it was processed, and within which partition. For use
    * tracking rewrite operations and for returning results.
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 6cda93f867..3c07e676a5 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -83,7 +83,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
   public void testRewriteDataFilesInEmptyTable() {
     createTable();
     List<Object[]> output = sql("CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
-    assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L)), output);
+    assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L, 0)), output);
   }
 
   @Test
@@ -101,7 +101,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(10, 2),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isInstanceOf(Long.class)
         .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -125,7 +125,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(10, 1),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isInstanceOf(Long.class)
         .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -149,7 +149,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 0 data files and add 0 data files",
-        ImmutableList.of(row(0, 0, 0L)),
+        ImmutableList.of(row(0, 0, 0L, 0)),
         output);
 
     List<Object[]> actualRecords = currentData();
@@ -175,7 +175,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(10, 1),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isInstanceOf(Long.class)
         .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -202,7 +202,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(10, 1),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isInstanceOf(Long.class)
         .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -244,7 +244,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(5, 1),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isInstanceOf(Long.class)
         .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -272,7 +272,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(5, 1),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isInstanceOf(Long.class)
         .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -300,7 +300,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(5, 1),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isInstanceOf(Long.class)
         .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -540,7 +540,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(10, 1),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isEqualTo(
             Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
@@ -579,7 +579,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(10, 1),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isInstanceOf(Long.class)
         .isEqualTo(
@@ -619,7 +619,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(10, 1),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isInstanceOf(Long.class)
         .isEqualTo(
@@ -655,7 +655,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
         row(2, 1),
         Arrays.copyOf(output.get(0), 2));
     // verify rewritten bytes separately
-    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)).hasSize(4);
     assertThat(output.get(0)[2])
         .isInstanceOf(Long.class)
         .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 5f95ef3ed4..658d3a9279 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.actions;
 import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -337,14 +338,21 @@ public class RewriteDataFilesSparkAction
         commitManager.service(groupsPerCommit);
     commitService.start();
 
+    Collection<FileGroupFailureResult> rewriteFailures = new ConcurrentLinkedQueue<>();
     // Start rewrite tasks
     Tasks.foreach(groupStream)
         .suppressFailureWhenFinished()
         .executeWith(rewriteService)
         .noRetry()
         .onFailure(
-            (fileGroup, exception) ->
-                LOG.error("Failure during rewrite group {}", fileGroup.info(), exception))
+            (fileGroup, exception) -> {
+              LOG.error("Failure during rewrite group {}", fileGroup.info(), exception);
+              rewriteFailures.add(
+                  ImmutableRewriteDataFiles.FileGroupFailureResult.builder()
+                      .info(fileGroup.info())
+                      .dataFilesCount(fileGroup.numFiles())
+                      .build());
+            })
         .run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
     rewriteService.shutdown();
 
@@ -362,7 +370,10 @@ public class RewriteDataFilesSparkAction
 
     List<FileGroupRewriteResult> rewriteResults =
         commitResults.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
-    return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build();
+    return ImmutableRewriteDataFiles.Result.builder()
+        .rewriteResults(rewriteResults)
+        .rewriteFailures(rewriteFailures)
+        .build();
   }
 
   Stream<RewriteFileGroup> toGroupStream(
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 1aea61e747..3929e346c3 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -68,7 +68,9 @@ class RewriteDataFilesProcedure extends BaseProcedure {
                 "rewritten_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
             new StructField(
                 "added_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
-            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty())
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField(
+                "failed_data_files_count", DataTypes.IntegerType, false, Metadata.empty())
           });
 
   public static ProcedureBuilder builder() {
@@ -216,8 +218,14 @@ class RewriteDataFilesProcedure extends BaseProcedure {
     int rewrittenDataFilesCount = result.rewrittenDataFilesCount();
     long rewrittenBytesCount = result.rewrittenBytesCount();
     int addedDataFilesCount = result.addedDataFilesCount();
+    int failedDataFilesCount = result.failedDataFilesCount();
+
     InternalRow row =
-        newInternalRow(rewrittenDataFilesCount, addedDataFilesCount, rewrittenBytesCount);
+        newInternalRow(
+            rewrittenDataFilesCount,
+            addedDataFilesCount,
+            rewrittenBytesCount,
+            failedDataFilesCount);
     return new InternalRow[] {row};
   }
 
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index a638776033..76b8f58d9a 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -755,7 +755,9 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
 
     RewriteDataFiles.Result result = spyRewrite.execute();
 
-    Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7);
+    assertThat(result.rewriteResults()).hasSize(7);
+    assertThat(result.rewriteFailures()).hasSize(3);
+    assertThat(result.failedDataFilesCount()).isEqualTo(6);
     assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
 
     table.refresh();
@@ -796,7 +798,9 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
 
     RewriteDataFiles.Result result = spyRewrite.execute();
 
-    Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7);
+    assertThat(result.rewriteResults()).hasSize(7);
+    assertThat(result.rewriteFailures()).hasSize(3);
+    assertThat(result.failedDataFilesCount()).isEqualTo(6);
     assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
 
     table.refresh();