You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/24 14:56:26 UTC

[iceberg] branch 0.13.x updated: Spark: Handle CommitStateUnknown exception in RewriteManifestSparkAction (#4836) (#4852)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/0.13.x by this push:
     new c2eee0a004 Spark: Handle CommitStateUnknown exception in RewriteManifestSparkAction (#4836) (#4852)
c2eee0a004 is described below

commit c2eee0a0046ed79f105f67a8194db5885778d35d
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Tue May 24 16:56:17 2022 +0200

    Spark: Handle CommitStateUnknown exception in RewriteManifestSparkAction (#4836) (#4852)
    
    Co-authored-by: Prashant Singh <35...@users.noreply.github.com>
    Co-authored-by: Prashant Singh <ps...@amazon.com>
---
 .../actions/BaseRewriteManifestsSparkAction.java   |  4 ++
 .../spark/actions/TestRewriteManifestsAction.java  | 69 ++++++++++++++++++++++
 2 files changed, 73 insertions(+)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index 05a845102c..f887fedd47 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
@@ -295,6 +296,9 @@ public class BaseRewriteManifestsSparkAction
         // delete new manifests as they were rewritten before the commit
         deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
       }
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      // don't clean up added manifest files, because they may have been successfully committed.
+      throw commitStateUnknownException;
     } catch (Exception e) {
       // delete all new manifests because the rewrite failed
       deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index ec5c54d21e..40adb7d4c9 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -31,6 +32,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -52,6 +54,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestRewriteManifestsAction extends SparkTestBase {
@@ -157,6 +162,70 @@ public class TestRewriteManifestsAction extends SparkTestBase {
     Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @Test
+  public void testRewriteManifestsWithCommitStateUnknownException() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, null, "AAAA"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
+        new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
+
+    SparkActions actions = SparkActions.get();
+
+    // create a spy which would throw a CommitStateUnknownException after successful commit.
+    org.apache.iceberg.RewriteManifests newRewriteManifests = table.rewriteManifests();
+    org.apache.iceberg.RewriteManifests spyNewRewriteManifests = spy(newRewriteManifests);
+    doAnswer(invocation -> {
+      newRewriteManifests.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyNewRewriteManifests).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests);
+
+    AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown Exception",
+        RuntimeException.class,
+        "Datacenter on Fire",
+        () -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> true).execute());
+
+    table.refresh();
+
+    // table should reflect the changes, since the commit was successful
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
+
+    Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
+    Assert.assertFalse(newManifests.get(0).hasAddedFiles());
+    Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
   @Test
   public void testRewriteSmallManifestsPartitionedTable() {
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)