You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/17 16:46:46 UTC
[iceberg] branch 0.13.x updated: Core: Fix transaction retry logic (#4464) (#4783)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/0.13.x by this push:
new 7d1cd3cdc Core: Fix transaction retry logic (#4464) (#4783)
7d1cd3cdc is described below
commit 7d1cd3cdcc458a5eb249bdba070d96fa88bcc6bf
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Tue May 17 18:46:41 2022 +0200
Core: Fix transaction retry logic (#4464) (#4783)
Co-authored-by: Ajantha Bhat <aj...@gmail.com>
---
.../java/org/apache/iceberg/BaseTransaction.java | 78 +++++++++++++++-------
.../java/org/apache/iceberg/TestTransaction.java | 32 +++++++++
2 files changed, 87 insertions(+), 23 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 50550ab09..b1fdaaa4e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -346,14 +346,7 @@ class BaseTransaction implements Transaction {
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(underlyingOps -> {
- if (base != underlyingOps.refresh()) {
- this.base = underlyingOps.current(); // just refreshed
- this.current = base;
- for (PendingUpdate update : updates) {
- // re-commit each update in the chain to apply it and update current
- update.commit();
- }
- }
+ applyUpdates(underlyingOps);
if (current.currentSnapshot() != null) {
currentSnapshotId.set(current.currentSnapshot().snapshotId());
@@ -366,22 +359,11 @@ class BaseTransaction implements Transaction {
} catch (CommitStateUnknownException e) {
throw e;
+ } catch (PendingUpdateFailedException e) {
+ cleanUpOnCommitFailure();
+ throw e.wrapped();
} catch (RuntimeException e) {
- // the commit failed and no files were committed. clean up each update.
- Tasks.foreach(updates)
- .suppressFailureWhenFinished()
- .run(update -> {
- if (update instanceof SnapshotProducer) {
- ((SnapshotProducer) update).cleanAll();
- }
- });
-
- // delete all files that were cleaned up
- Tasks.foreach(deletedFiles)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
- .run(ops.io()::deleteFile);
-
+ cleanUpOnCommitFailure();
throw e;
}
@@ -415,6 +397,40 @@ class BaseTransaction implements Transaction {
}
}
+ private void cleanUpOnCommitFailure() {
+ // the commit failed and no files were committed. clean up each update.
+ Tasks.foreach(updates)
+ .suppressFailureWhenFinished()
+ .run(update -> {
+ if (update instanceof SnapshotProducer) {
+ ((SnapshotProducer) update).cleanAll();
+ }
+ });
+
+ // delete all files that were cleaned up
+ Tasks.foreach(deletedFiles)
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+ .run(ops.io()::deleteFile);
+ }
+
+ private void applyUpdates(TableOperations underlyingOps) {
+ if (base != underlyingOps.refresh()) {
+ // use refreshed the metadata
+ this.base = underlyingOps.current();
+ this.current = underlyingOps.current();
+ for (PendingUpdate update : updates) {
+ // re-commit each update in the chain to apply it and update current
+ try {
+ update.commit();
+ } catch (CommitFailedException e) {
+ // Cannot pass even with retry due to conflicting metadata changes. So, break the retry-loop.
+ throw new PendingUpdateFailedException(e);
+ }
+ }
+ }
+ }
+
private static Set<String> committedFiles(TableOperations ops, Set<Long> snapshotIds) {
if (snapshotIds.isEmpty()) {
return null;
@@ -706,4 +722,20 @@ class BaseTransaction implements Transaction {
Set<String> deletedFiles() {
return deletedFiles;
}
+
+ /**
+ * Exception used to avoid retrying {@link PendingUpdate} when it is failed with {@link CommitFailedException}.
+ */
+ private static class PendingUpdateFailedException extends RuntimeException {
+ private final CommitFailedException wrapped;
+
+ private PendingUpdateFailedException(CommitFailedException cause) {
+ super(cause);
+ this.wrapped = cause;
+ }
+
+ public CommitFailedException wrapped() {
+ return wrapped;
+ }
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 2b46118c7..36eb7fbc6 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -420,6 +421,37 @@ public class TestTransaction extends TableTestBase {
Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
}
+ @Test
+ public void testTransactionRetrySchemaUpdate() {
+ // use only one retry
+ table.updateProperties()
+ .set(TableProperties.COMMIT_NUM_RETRIES, "1")
+ .commit();
+
+ // start a transaction
+ Transaction txn = table.newTransaction();
+ // add column "new-column"
+ txn.updateSchema()
+ .addColumn("new-column", Types.IntegerType.get())
+ .commit();
+ int schemaId = txn.table().schema().schemaId();
+
+ // directly update the table for adding "another-column" (which causes in-progress txn commit fail)
+ table.updateSchema()
+ .addColumn("another-column", Types.IntegerType.get())
+ .commit();
+ int conflictingSchemaId = table.schema().schemaId();
+
+ Assert.assertEquals("Both schema IDs should be the same in order to cause a conflict",
+ conflictingSchemaId,
+ schemaId);
+
+ // commit the transaction for adding "new-column"
+ AssertHelpers.assertThrows("Should fail due to conflicting transaction even after retry",
+ CommitFailedException.class, "Table metadata refresh is required",
+ txn::commitTransaction);
+ }
+
@Test
public void testTransactionRetryMergeCleanup() {
// use only one retry and aggressively merge manifests