You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/17 16:46:46 UTC

[iceberg] branch 0.13.x updated: Core: Fix transaction retry logic (#4464) (#4783)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/0.13.x by this push:
     new 7d1cd3cdc Core: Fix transaction retry logic (#4464) (#4783)
7d1cd3cdc is described below

commit 7d1cd3cdcc458a5eb249bdba070d96fa88bcc6bf
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Tue May 17 18:46:41 2022 +0200

    Core: Fix transaction retry logic (#4464) (#4783)
    
    Co-authored-by: Ajantha Bhat <aj...@gmail.com>
---
 .../java/org/apache/iceberg/BaseTransaction.java   | 78 +++++++++++++++-------
 .../java/org/apache/iceberg/TestTransaction.java   | 32 +++++++++
 2 files changed, 87 insertions(+), 23 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 50550ab09..b1fdaaa4e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -346,14 +346,7 @@ class BaseTransaction implements Transaction {
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(underlyingOps -> {
-            if (base != underlyingOps.refresh()) {
-              this.base = underlyingOps.current(); // just refreshed
-              this.current = base;
-              for (PendingUpdate update : updates) {
-                // re-commit each update in the chain to apply it and update current
-                update.commit();
-              }
-            }
+            applyUpdates(underlyingOps);
 
             if (current.currentSnapshot() != null) {
               currentSnapshotId.set(current.currentSnapshot().snapshotId());
@@ -366,22 +359,11 @@ class BaseTransaction implements Transaction {
     } catch (CommitStateUnknownException e) {
       throw e;
 
+    } catch (PendingUpdateFailedException e) {
+      cleanUpOnCommitFailure();
+      throw e.wrapped();
     } catch (RuntimeException e) {
-      // the commit failed and no files were committed. clean up each update.
-      Tasks.foreach(updates)
-          .suppressFailureWhenFinished()
-          .run(update -> {
-            if (update instanceof SnapshotProducer) {
-              ((SnapshotProducer) update).cleanAll();
-            }
-          });
-
-      // delete all files that were cleaned up
-      Tasks.foreach(deletedFiles)
-          .suppressFailureWhenFinished()
-          .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
-          .run(ops.io()::deleteFile);
-
+      cleanUpOnCommitFailure();
       throw e;
     }
 
@@ -415,6 +397,40 @@ class BaseTransaction implements Transaction {
     }
   }
 
+  private void cleanUpOnCommitFailure() {
+    // the commit failed and no files were committed. clean up each update.
+    Tasks.foreach(updates)
+        .suppressFailureWhenFinished()
+        .run(update -> {
+          if (update instanceof SnapshotProducer) {
+            ((SnapshotProducer) update).cleanAll();
+          }
+        });
+
+    // delete all files that were cleaned up
+    Tasks.foreach(deletedFiles)
+        .suppressFailureWhenFinished()
+        .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+        .run(ops.io()::deleteFile);
+  }
+
+  private void applyUpdates(TableOperations underlyingOps) {
+    if (base != underlyingOps.refresh()) {
+      // use refreshed the metadata
+      this.base = underlyingOps.current();
+      this.current = underlyingOps.current();
+      for (PendingUpdate update : updates) {
+        // re-commit each update in the chain to apply it and update current
+        try {
+          update.commit();
+        } catch (CommitFailedException e) {
+          // Cannot pass even with retry due to conflicting metadata changes. So, break the retry-loop.
+          throw new PendingUpdateFailedException(e);
+        }
+      }
+    }
+  }
+
   private static Set<String> committedFiles(TableOperations ops, Set<Long> snapshotIds) {
     if (snapshotIds.isEmpty()) {
       return null;
@@ -706,4 +722,20 @@ class BaseTransaction implements Transaction {
   Set<String> deletedFiles() {
     return deletedFiles;
   }
+
+  /**
+   * Exception used to avoid retrying {@link PendingUpdate} when it is failed with {@link CommitFailedException}.
+   */
+  private static class PendingUpdateFailedException extends RuntimeException {
+    private final CommitFailedException wrapped;
+
+    private PendingUpdateFailedException(CommitFailedException cause) {
+      super(cause);
+      this.wrapped = cause;
+    }
+
+    public CommitFailedException wrapped() {
+      return wrapped;
+    }
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 2b46118c7..36eb7fbc6 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -420,6 +421,37 @@ public class TestTransaction extends TableTestBase {
     Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
   }
 
+  @Test
+  public void testTransactionRetrySchemaUpdate() {
+    // use only one retry
+    table.updateProperties()
+        .set(TableProperties.COMMIT_NUM_RETRIES, "1")
+        .commit();
+
+    // start a transaction
+    Transaction txn = table.newTransaction();
+    // add column "new-column"
+    txn.updateSchema()
+        .addColumn("new-column", Types.IntegerType.get())
+        .commit();
+    int schemaId = txn.table().schema().schemaId();
+
+    // directly update the table for adding "another-column" (which causes in-progress txn commit fail)
+    table.updateSchema()
+        .addColumn("another-column", Types.IntegerType.get())
+        .commit();
+    int conflictingSchemaId = table.schema().schemaId();
+
+    Assert.assertEquals("Both schema IDs should be the same in order to cause a conflict",
+        conflictingSchemaId,
+        schemaId);
+
+    // commit the transaction for adding "new-column"
+    AssertHelpers.assertThrows("Should fail due to conflicting transaction even after retry",
+        CommitFailedException.class, "Table metadata refresh is required",
+        txn::commitTransaction);
+  }
+
   @Test
   public void testTransactionRetryMergeCleanup() {
     // use only one retry and aggressively merge manifests