You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/08 20:06:25 UTC

[incubator-iceberg] branch master updated: Clean up after create and replace transaction failures (#364)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ce226cf  Clean up after create and replace transaction failures (#364)
ce226cf is described below

commit ce226cf8ba851641f789d36bd2077925e63ef65b
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Aug 8 13:06:20 2019 -0700

    Clean up after create and replace transaction failures (#364)
---
 .../java/org/apache/iceberg/BaseTransaction.java   | 252 +++++++++++++--------
 .../org/apache/iceberg/TestCreateTransaction.java  |   6 +
 .../org/apache/iceberg/TestReplaceTransaction.java |   8 +-
 3 files changed, 173 insertions(+), 93 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 31f6692..dd1a623 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -32,7 +32,6 @@ import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
-import org.apache.iceberg.util.Exceptions;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -198,107 +197,176 @@ class BaseTransaction implements Transaction {
 
     switch (type) {
       case CREATE_TABLE:
-        // fix up the snapshot log, which should not contain intermediate snapshots
-        TableMetadata createMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
-
-        // this operation creates the table. if the commit fails, this cannot retry because another
-        // process has created the same table.
-        ops.commit(null, createMetadata);
+        commitCreateTransaction();
         break;
 
       case REPLACE_TABLE:
-        // fix up the snapshot log, which should not contain intermediate snapshots
-        TableMetadata replaceMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
-
-        Tasks.foreach(ops)
-            .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-            .exponentialBackoff(
-                base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-                base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-                base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-                2.0 /* exponential */)
-            .onlyRetryOn(CommitFailedException.class)
-            .run(underlyingOps -> {
-              // because this is a replace table, it will always completely replace the table
-              // metadata. even if it was just updated.
-              if (base != underlyingOps.refresh()) {
-                this.base = underlyingOps.current(); // just refreshed
-              }
-
-              underlyingOps.commit(base, replaceMetadata);
-            });
+        commitReplaceTransaction();
         break;
 
       case SIMPLE:
-        // if there were no changes, don't try to commit
-        if (base == current) {
-          return;
-        }
-
-        // this is always set to the latest commit attempt's snapshot id.
-        AtomicLong currentSnapshotId = new AtomicLong(-1L);
-
-        try {
-          Tasks.foreach(ops)
-              .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-              .exponentialBackoff(
-                  base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-                  base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-                  base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-                  2.0 /* exponential */)
-              .onlyRetryOn(CommitFailedException.class)
-              .run(underlyingOps -> {
-                if (base != underlyingOps.refresh()) {
-                  this.base = underlyingOps.current(); // just refreshed
-                  this.current = base;
-                  for (PendingUpdate update : updates) {
-                    // re-commit each update in the chain to apply it and update current
-                    update.commit();
-                  }
-                }
-
-                currentSnapshotId.set(current.currentSnapshot().snapshotId());
-
-                // fix up the snapshot log, which should not contain intermediate snapshots
-                underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
-              });
-
-        } catch (RuntimeException e) {
-          // the commit failed and there are no committed manifests. delete any file cleaned up by an operation.
-          Exceptions.suppressAndThrow(e, () -> deletedFiles.forEach(ops.io()::deleteFile));
-        }
-
-        // the commit succeeded
-
-        try {
-          intermediateSnapshotIds.add(currentSnapshotId.get());
-
-          // clean up the data files that were deleted by each operation. first, get the list of committed manifests to
-          // ensure that no committed manifest is deleted. a manifest could be deleted in one successful operation
-          // commit, but reused in another successful commit of that operation if the whole transaction is retried.
-          Set<String> committedFiles = committedFiles(ops, intermediateSnapshotIds);
-          if (committedFiles != null) {
-            // delete all of the files that were deleted in the most recent set of operation commits
-            Tasks.foreach(deletedFiles)
-                .suppressFailureWhenFinished()
-                .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
-                .run(path -> {
-                  if (!committedFiles.contains(path)) {
-                    ops.io().deleteFile(path);
-                  }
-                });
-          } else {
-            LOG.warn("Failed to load metadata for a committed snapshot, skipping clean-up");
-          }
-
-        } catch (RuntimeException e) {
-          LOG.warn("Failed to load committed metadata, skipping clean-up", e);
-        }
-
+        commitSimpleTransaction();
         break;
     }
   }
 
+  private void commitCreateTransaction() {
+    // fix up the snapshot log, which should not contain intermediate snapshots
+    TableMetadata createMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
+
+    // this operation creates the table. if the commit fails, this cannot retry because another
+    // process has created the same table.
+    try {
+      ops.commit(null, createMetadata);
+
+    } catch (RuntimeException e) {
+      // the commit failed and no files were committed. clean up each update.
+      Tasks.foreach(updates)
+          .suppressFailureWhenFinished()
+          .run(update -> {
+            if (update instanceof SnapshotProducer) {
+              ((SnapshotProducer) update).cleanAll();
+            }
+          });
+
+      throw e;
+
+    } finally {
+      // create table never needs to retry because the table has no previous state. because retries are not a
+      // concern, it is safe to delete all of the deleted files from individual operations
+      Tasks.foreach(deletedFiles)
+          .suppressFailureWhenFinished()
+          .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+          .run(ops.io()::deleteFile);
+    }
+  }
+
+  private void commitReplaceTransaction() {
+    // fix up the snapshot log, which should not contain intermediate snapshots
+    TableMetadata replaceMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
+
+    try {
+      Tasks.foreach(ops)
+          .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .onlyRetryOn(CommitFailedException.class)
+          .run(underlyingOps -> {
+            // because this is a replace table, it will always completely replace the table
+            // metadata. even if it was just updated.
+            if (base != underlyingOps.refresh()) {
+              this.base = underlyingOps.current(); // just refreshed
+            }
+
+            underlyingOps.commit(base, replaceMetadata);
+          });
+
+    } catch (RuntimeException e) {
+      // the commit failed and no files were committed. clean up each update.
+      Tasks.foreach(updates)
+          .suppressFailureWhenFinished()
+          .run(update -> {
+            if (update instanceof SnapshotProducer) {
+              ((SnapshotProducer) update).cleanAll();
+            }
+          });
+
+      throw e;
+
+    } finally {
+      // replace table never needs to retry because the table state is completely replaced. because retries are not
+      // a concern, it is safe to delete all of the deleted files from individual operations
+      Tasks.foreach(deletedFiles)
+          .suppressFailureWhenFinished()
+          .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+          .run(ops.io()::deleteFile);
+    }
+  }
+
+  private void commitSimpleTransaction() {
+    // if there were no changes, don't try to commit
+    if (base == current) {
+      return;
+    }
+
+    // this is always set to the latest commit attempt's snapshot id.
+    AtomicLong currentSnapshotId = new AtomicLong(-1L);
+
+    try {
+      Tasks.foreach(ops)
+          .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .onlyRetryOn(CommitFailedException.class)
+          .run(underlyingOps -> {
+            if (base != underlyingOps.refresh()) {
+              this.base = underlyingOps.current(); // just refreshed
+              this.current = base;
+              for (PendingUpdate update : updates) {
+                // re-commit each update in the chain to apply it and update current
+                update.commit();
+              }
+            }
+
+            currentSnapshotId.set(current.currentSnapshot().snapshotId());
+
+            // fix up the snapshot log, which should not contain intermediate snapshots
+            underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
+          });
+
+    } catch (RuntimeException e) {
+      // the commit failed and no files were committed. clean up each update.
+      Tasks.foreach(updates)
+          .suppressFailureWhenFinished()
+          .run(update -> {
+            if (update instanceof SnapshotProducer) {
+              ((SnapshotProducer) update).cleanAll();
+            }
+          });
+
+      // delete all files that were cleaned up
+      Tasks.foreach(deletedFiles)
+          .suppressFailureWhenFinished()
+          .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+          .run(ops.io()::deleteFile);
+
+      throw e;
+    }
+
+    // the commit succeeded
+
+    try {
+      intermediateSnapshotIds.add(currentSnapshotId.get());
+
+      // clean up the data files that were deleted by each operation. first, get the list of committed manifests to
+      // ensure that no committed manifest is deleted. a manifest could be deleted in one successful operation
+      // commit, but reused in another successful commit of that operation if the whole transaction is retried.
+      Set<String> committedFiles = committedFiles(ops, intermediateSnapshotIds);
+      if (committedFiles != null) {
+        // delete all of the files that were deleted in the most recent set of operation commits
+        Tasks.foreach(deletedFiles)
+            .suppressFailureWhenFinished()
+            .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+            .run(path -> {
+              if (!committedFiles.contains(path)) {
+                ops.io().deleteFile(path);
+              }
+            });
+      } else {
+        LOG.warn("Failed to load metadata for a committed snapshot, skipping clean-up");
+      }
+
+    } catch (RuntimeException e) {
+      LOG.warn("Failed to load committed metadata, skipping clean-up", e);
+    }
+  }
+
   private static Set<String> committedFiles(TableOperations ops, Set<Long> intermediateSnapshotIds) {
     Set<String> committedFiles = Sets.newHashSet();
 
diff --git a/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java b/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java
index bc8bf4b..e73359d 100644
--- a/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg;
 
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -265,6 +266,9 @@ public class TestCreateTransaction extends TableTestBase {
 
     Transaction txn = TestTables.beginCreate(tableDir, "test_conflict", SCHEMA, SPEC);
 
+    // append in the transaction to ensure a manifest file is created
+    txn.newAppend().appendFile(FILE_A).commit();
+
     Assert.assertNull("Starting a create transaction should not commit metadata",
         TestTables.readMetadata("test_conflict"));
     Assert.assertNull("Should have no metadata version",
@@ -281,6 +285,8 @@ public class TestCreateTransaction extends TableTestBase {
 
     AssertHelpers.assertThrows("Transaction commit should fail",
         CommitFailedException.class, "Commit failed: table was updated", txn::commitTransaction);
+
+    Assert.assertEquals("Should clean up metadata", Sets.newHashSet(), Sets.newHashSet(listManifestFiles(tableDir)));
   }
 
   private static Schema assignFreshIds(Schema schema) {
diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
index 27422b6..68ef9b9 100644
--- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
@@ -19,8 +19,10 @@
 
 package org.apache.iceberg;
 
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.types.TypeUtil;
@@ -207,7 +209,8 @@ public class TestReplaceTransaction extends TableTestBase {
         .appendFile(FILE_D)
         .commit();
 
-    table.ops().failCommits(1);
+    // trigger eventual transaction retry
+    ((TestTables.TestTableOperations) ((BaseTransaction) replace).ops()).failCommits(1);
 
     replace.commitTransaction();
 
@@ -232,6 +235,7 @@ public class TestReplaceTransaction extends TableTestBase {
     Assert.assertEquals("Version should be 1", 1L, (long) version());
 
     validateSnapshot(start, table.currentSnapshot(), FILE_A);
+    Set<File> manifests = Sets.newHashSet(listManifestFiles());
 
     Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec());
 
@@ -253,6 +257,8 @@ public class TestReplaceTransaction extends TableTestBase {
     table.refresh();
 
     validateSnapshot(start, table.currentSnapshot(), FILE_A);
+
+    Assert.assertEquals("Should clean up replace manifests", manifests, Sets.newHashSet(listManifestFiles()));
   }
 
   @Test