You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/24 14:56:26 UTC
[iceberg] branch 0.13.x updated: Spark: Handle CommitStateUnknown exception in RewriteManifestSparkAction (#4836) (#4852)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/0.13.x by this push:
new c2eee0a004 Spark: Handle CommitStateUnknown exception in RewriteManifestSparkAction (#4836) (#4852)
c2eee0a004 is described below
commit c2eee0a0046ed79f105f67a8194db5885778d35d
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Tue May 24 16:56:17 2022 +0200
Spark: Handle CommitStateUnknown exception in RewriteManifestSparkAction (#4836) (#4852)
Co-authored-by: Prashant Singh <35...@users.noreply.github.com>
Co-authored-by: Prashant Singh <ps...@amazon.com>
---
.../actions/BaseRewriteManifestsSparkAction.java | 4 ++
.../spark/actions/TestRewriteManifestsAction.java | 69 ++++++++++++++++++++++
2 files changed, 73 insertions(+)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index 05a845102c..f887fedd47 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
@@ -295,6 +296,9 @@ public class BaseRewriteManifestsSparkAction
// delete new manifests as they were rewritten before the commit
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}
+ } catch (CommitStateUnknownException commitStateUnknownException) {
+ // don't clean up added manifest files, because they may have been successfully committed.
+ throw commitStateUnknownException;
} catch (Exception e) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index ec5c54d21e..40adb7d4c9 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -31,6 +32,7 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -52,6 +54,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestRewriteManifestsAction extends SparkTestBase {
@@ -157,6 +162,70 @@ public class TestRewriteManifestsAction extends SparkTestBase {
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}
+ @Test
+ public void testRewriteManifestsWithCommitStateUnknownException() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ List<ThreeColumnRecord> records1 = Lists.newArrayList(
+ new ThreeColumnRecord(1, null, "AAAA"),
+ new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
+ );
+ writeRecords(records1);
+
+ List<ThreeColumnRecord> records2 = Lists.newArrayList(
+ new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
+ new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
+ );
+ writeRecords(records2);
+
+ table.refresh();
+
+ List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+ Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
+
+ SparkActions actions = SparkActions.get();
+
+ // create a spy which would throw a CommitStateUnknownException after successful commit.
+ org.apache.iceberg.RewriteManifests newRewriteManifests = table.rewriteManifests();
+ org.apache.iceberg.RewriteManifests spyNewRewriteManifests = spy(newRewriteManifests);
+ doAnswer(invocation -> {
+ newRewriteManifests.commit();
+ throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+ }).when(spyNewRewriteManifests).commit();
+
+ Table spyTable = spy(table);
+ when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests);
+
+ AssertHelpers.assertThrowsCause("Should throw a Commit State Unknown Exception",
+ RuntimeException.class,
+ "Datacenter on Fire",
+ () -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> true).execute());
+
+ table.refresh();
+
+ // table should reflect the changes, since the commit was successful
+ List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+ Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
+
+ Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
+ Assert.assertFalse(newManifests.get(0).hasAddedFiles());
+ Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+
+ List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(records1);
+ expectedRecords.addAll(records2);
+
+ Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+ List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
+ .as(Encoders.bean(ThreeColumnRecord.class))
+ .collectAsList();
+
+ Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ }
+
@Test
public void testRewriteSmallManifestsPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)