You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/08 20:06:25 UTC
[incubator-iceberg] branch master updated: Clean up after create
and replace transaction failures (#364)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ce226cf Clean up after create and replace transaction failures (#364)
ce226cf is described below
commit ce226cf8ba851641f789d36bd2077925e63ef65b
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Aug 8 13:06:20 2019 -0700
Clean up after create and replace transaction failures (#364)
---
.../java/org/apache/iceberg/BaseTransaction.java | 252 +++++++++++++--------
.../org/apache/iceberg/TestCreateTransaction.java | 6 +
.../org/apache/iceberg/TestReplaceTransaction.java | 8 +-
3 files changed, 173 insertions(+), 93 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 31f6692..dd1a623 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -32,7 +32,6 @@ import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
-import org.apache.iceberg.util.Exceptions;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -198,107 +197,176 @@ class BaseTransaction implements Transaction {
switch (type) {
case CREATE_TABLE:
- // fix up the snapshot log, which should not contain intermediate snapshots
- TableMetadata createMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
-
- // this operation creates the table. if the commit fails, this cannot retry because another
- // process has created the same table.
- ops.commit(null, createMetadata);
+ commitCreateTransaction();
break;
case REPLACE_TABLE:
- // fix up the snapshot log, which should not contain intermediate snapshots
- TableMetadata replaceMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
-
- Tasks.foreach(ops)
- .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .onlyRetryOn(CommitFailedException.class)
- .run(underlyingOps -> {
- // because this is a replace table, it will always completely replace the table
- // metadata. even if it was just updated.
- if (base != underlyingOps.refresh()) {
- this.base = underlyingOps.current(); // just refreshed
- }
-
- underlyingOps.commit(base, replaceMetadata);
- });
+ commitReplaceTransaction();
break;
case SIMPLE:
- // if there were no changes, don't try to commit
- if (base == current) {
- return;
- }
-
- // this is always set to the latest commit attempt's snapshot id.
- AtomicLong currentSnapshotId = new AtomicLong(-1L);
-
- try {
- Tasks.foreach(ops)
- .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .onlyRetryOn(CommitFailedException.class)
- .run(underlyingOps -> {
- if (base != underlyingOps.refresh()) {
- this.base = underlyingOps.current(); // just refreshed
- this.current = base;
- for (PendingUpdate update : updates) {
- // re-commit each update in the chain to apply it and update current
- update.commit();
- }
- }
-
- currentSnapshotId.set(current.currentSnapshot().snapshotId());
-
- // fix up the snapshot log, which should not contain intermediate snapshots
- underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
- });
-
- } catch (RuntimeException e) {
- // the commit failed and there are no committed manifests. delete any file cleaned up by an operation.
- Exceptions.suppressAndThrow(e, () -> deletedFiles.forEach(ops.io()::deleteFile));
- }
-
- // the commit succeeded
-
- try {
- intermediateSnapshotIds.add(currentSnapshotId.get());
-
- // clean up the data files that were deleted by each operation. first, get the list of committed manifests to
- // ensure that no committed manifest is deleted. a manifest could be deleted in one successful operation
- // commit, but reused in another successful commit of that operation if the whole transaction is retried.
- Set<String> committedFiles = committedFiles(ops, intermediateSnapshotIds);
- if (committedFiles != null) {
- // delete all of the files that were deleted in the most recent set of operation commits
- Tasks.foreach(deletedFiles)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
- .run(path -> {
- if (!committedFiles.contains(path)) {
- ops.io().deleteFile(path);
- }
- });
- } else {
- LOG.warn("Failed to load metadata for a committed snapshot, skipping clean-up");
- }
-
- } catch (RuntimeException e) {
- LOG.warn("Failed to load committed metadata, skipping clean-up", e);
- }
-
+ commitSimpleTransaction();
break;
}
}
+ private void commitCreateTransaction() {
+ // fix up the snapshot log, which should not contain intermediate snapshots
+ TableMetadata createMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
+
+ // this operation creates the table. if the commit fails, this cannot retry because another
+ // process has created the same table.
+ try {
+ ops.commit(null, createMetadata);
+
+ } catch (RuntimeException e) {
+ // the commit failed and no files were committed. clean up each update.
+ Tasks.foreach(updates)
+ .suppressFailureWhenFinished()
+ .run(update -> {
+ if (update instanceof SnapshotProducer) {
+ ((SnapshotProducer) update).cleanAll();
+ }
+ });
+
+ throw e;
+
+ } finally {
+ // create table never needs to retry because the table has no previous state. because retries are not a
+ // concern, it is safe to delete all of the deleted files from individual operations
+ Tasks.foreach(deletedFiles)
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+ .run(ops.io()::deleteFile);
+ }
+ }
+
+ private void commitReplaceTransaction() {
+ // fix up the snapshot log, which should not contain intermediate snapshots
+ TableMetadata replaceMetadata = current.removeSnapshotLogEntries(intermediateSnapshotIds);
+
+ try {
+ Tasks.foreach(ops)
+ .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(underlyingOps -> {
+ // because this is a replace table, it will always completely replace the table
+ // metadata. even if it was just updated.
+ if (base != underlyingOps.refresh()) {
+ this.base = underlyingOps.current(); // just refreshed
+ }
+
+ underlyingOps.commit(base, replaceMetadata);
+ });
+
+ } catch (RuntimeException e) {
+ // the commit failed and no files were committed. clean up each update.
+ Tasks.foreach(updates)
+ .suppressFailureWhenFinished()
+ .run(update -> {
+ if (update instanceof SnapshotProducer) {
+ ((SnapshotProducer) update).cleanAll();
+ }
+ });
+
+ throw e;
+
+ } finally {
+ // replace table never needs to retry because the table state is completely replaced. because retries are not
+ // a concern, it is safe to delete all of the deleted files from individual operations
+ Tasks.foreach(deletedFiles)
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+ .run(ops.io()::deleteFile);
+ }
+ }
+
+ private void commitSimpleTransaction() {
+ // if there were no changes, don't try to commit
+ if (base == current) {
+ return;
+ }
+
+ // this is always set to the latest commit attempt's snapshot id.
+ AtomicLong currentSnapshotId = new AtomicLong(-1L);
+
+ try {
+ Tasks.foreach(ops)
+ .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(underlyingOps -> {
+ if (base != underlyingOps.refresh()) {
+ this.base = underlyingOps.current(); // just refreshed
+ this.current = base;
+ for (PendingUpdate update : updates) {
+ // re-commit each update in the chain to apply it and update current
+ update.commit();
+ }
+ }
+
+ currentSnapshotId.set(current.currentSnapshot().snapshotId());
+
+ // fix up the snapshot log, which should not contain intermediate snapshots
+ underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
+ });
+
+ } catch (RuntimeException e) {
+ // the commit failed and no files were committed. clean up each update.
+ Tasks.foreach(updates)
+ .suppressFailureWhenFinished()
+ .run(update -> {
+ if (update instanceof SnapshotProducer) {
+ ((SnapshotProducer) update).cleanAll();
+ }
+ });
+
+ // delete all files that were cleaned up
+ Tasks.foreach(deletedFiles)
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+ .run(ops.io()::deleteFile);
+
+ throw e;
+ }
+
+ // the commit succeeded
+
+ try {
+ intermediateSnapshotIds.add(currentSnapshotId.get());
+
+ // clean up the data files that were deleted by each operation. first, get the list of committed manifests to
+ // ensure that no committed manifest is deleted. a manifest could be deleted in one successful operation
+ // commit, but reused in another successful commit of that operation if the whole transaction is retried.
+ Set<String> committedFiles = committedFiles(ops, intermediateSnapshotIds);
+ if (committedFiles != null) {
+ // delete all of the files that were deleted in the most recent set of operation commits
+ Tasks.foreach(deletedFiles)
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+ .run(path -> {
+ if (!committedFiles.contains(path)) {
+ ops.io().deleteFile(path);
+ }
+ });
+ } else {
+ LOG.warn("Failed to load metadata for a committed snapshot, skipping clean-up");
+ }
+
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to load committed metadata, skipping clean-up", e);
+ }
+ }
+
private static Set<String> committedFiles(TableOperations ops, Set<Long> intermediateSnapshotIds) {
Set<String> committedFiles = Sets.newHashSet();
diff --git a/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java b/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java
index bc8bf4b..e73359d 100644
--- a/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -265,6 +266,9 @@ public class TestCreateTransaction extends TableTestBase {
Transaction txn = TestTables.beginCreate(tableDir, "test_conflict", SCHEMA, SPEC);
+ // append in the transaction to ensure a manifest file is created
+ txn.newAppend().appendFile(FILE_A).commit();
+
Assert.assertNull("Starting a create transaction should not commit metadata",
TestTables.readMetadata("test_conflict"));
Assert.assertNull("Should have no metadata version",
@@ -281,6 +285,8 @@ public class TestCreateTransaction extends TableTestBase {
AssertHelpers.assertThrows("Transaction commit should fail",
CommitFailedException.class, "Commit failed: table was updated", txn::commitTransaction);
+
+ Assert.assertEquals("Should clean up metadata", Sets.newHashSet(), Sets.newHashSet(listManifestFiles(tableDir)));
}
private static Schema assignFreshIds(Schema schema) {
diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
index 27422b6..68ef9b9 100644
--- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
@@ -19,8 +19,10 @@
package org.apache.iceberg;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.types.TypeUtil;
@@ -207,7 +209,8 @@ public class TestReplaceTransaction extends TableTestBase {
.appendFile(FILE_D)
.commit();
- table.ops().failCommits(1);
+ // trigger eventual transaction retry
+ ((TestTables.TestTableOperations) ((BaseTransaction) replace).ops()).failCommits(1);
replace.commitTransaction();
@@ -232,6 +235,7 @@ public class TestReplaceTransaction extends TableTestBase {
Assert.assertEquals("Version should be 1", 1L, (long) version());
validateSnapshot(start, table.currentSnapshot(), FILE_A);
+ Set<File> manifests = Sets.newHashSet(listManifestFiles());
Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec());
@@ -253,6 +257,8 @@ public class TestReplaceTransaction extends TableTestBase {
table.refresh();
validateSnapshot(start, table.currentSnapshot(), FILE_A);
+
+ Assert.assertEquals("Should clean up replace manifests", manifests, Sets.newHashSet(listManifestFiles()));
}
@Test