You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/07 00:54:51 UTC
[incubator-iceberg] branch master updated: Fix transaction cleanup
(#352)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d8ca485 Fix transaction cleanup (#352)
d8ca485 is described below
commit d8ca485d50a64ad6209deae93948c4e75f139e76
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Aug 6 17:54:46 2019 -0700
Fix transaction cleanup (#352)
---
.../java/org/apache/iceberg/BaseTransaction.java | 102 ++++++++++++++++-----
.../java/org/apache/iceberg/SnapshotProducer.java | 4 +-
2 files changed, 82 insertions(+), 24 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 1353bb7..31f6692 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -26,12 +26,16 @@ import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.util.Exceptions;
import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
@@ -43,6 +47,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
class BaseTransaction implements Transaction {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseTransaction.class);
+
private enum TransactionType {
CREATE_TABLE,
REPLACE_TABLE,
@@ -229,35 +235,87 @@ class BaseTransaction implements Transaction {
return;
}
- Tasks.foreach(ops)
- .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .onlyRetryOn(CommitFailedException.class)
- .run(underlyingOps -> {
- if (base != underlyingOps.refresh()) {
- this.base = underlyingOps.current(); // just refreshed
- this.current = base;
- this.deletedFiles.clear(); // clear deletes from the last set of operation commits
- for (PendingUpdate update : updates) {
- // re-commit each update in the chain to apply it and update current
- update.commit();
+ // this is always set to the latest commit attempt's snapshot id.
+ AtomicLong currentSnapshotId = new AtomicLong(-1L);
+
+ try {
+ Tasks.foreach(ops)
+ .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(underlyingOps -> {
+ if (base != underlyingOps.refresh()) {
+ this.base = underlyingOps.current(); // just refreshed
+ this.current = base;
+ for (PendingUpdate update : updates) {
+ // re-commit each update in the chain to apply it and update current
+ update.commit();
+ }
}
- }
- // fix up the snapshot log, which should not contain intermediate snapshots
- underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
- });
+ currentSnapshotId.set(current.currentSnapshot().snapshotId());
+
+ // fix up the snapshot log, which should not contain intermediate snapshots
+ underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
+ });
+
+ } catch (RuntimeException e) {
+ // the commit failed and there are no committed manifests. delete any file cleaned up by an operation.
+ Exceptions.suppressAndThrow(e, () -> deletedFiles.forEach(ops.io()::deleteFile));
+ }
+
+ // the commit succeeded
+
+ try {
+ intermediateSnapshotIds.add(currentSnapshotId.get());
+
+ // clean up the data files that were deleted by each operation. first, get the list of committed manifests to
+ // ensure that no committed manifest is deleted. a manifest could be deleted in one successful operation
+ // commit, but reused in another successful commit of that operation if the whole transaction is retried.
+ Set<String> committedFiles = committedFiles(ops, intermediateSnapshotIds);
+ if (committedFiles != null) {
+ // delete all of the files that were deleted in the most recent set of operation commits
+ Tasks.foreach(deletedFiles)
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+ .run(path -> {
+ if (!committedFiles.contains(path)) {
+ ops.io().deleteFile(path);
+ }
+ });
+ } else {
+ LOG.warn("Failed to load metadata for a committed snapshot, skipping clean-up");
+ }
+
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to load committed metadata, skipping clean-up", e);
+ }
- // delete all of the files that were deleted in the most recent set of operation commits
- deletedFiles.forEach(ops.io()::deleteFile);
break;
}
}
+ private static Set<String> committedFiles(TableOperations ops, Set<Long> intermediateSnapshotIds) {
+ Set<String> committedFiles = Sets.newHashSet();
+
+ for (long snapshotId : intermediateSnapshotIds) {
+ Snapshot snap = ops.current().snapshot(snapshotId);
+ if (snap != null) {
+ committedFiles.add(snap.manifestListLocation());
+ snap.manifests()
+ .forEach(manifest -> committedFiles.add(manifest.path()));
+ } else {
+ return null;
+ }
+ }
+
+ return committedFiles;
+ }
+
private static Long currentId(TableMetadata meta) {
if (meta != null) {
if (meta.currentSnapshot() != null) {
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 83f15b5..37097e0 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -270,11 +270,11 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
} else {
// saved may not be present if the latest metadata couldn't be loaded due to eventual
// consistency problems in refresh. in that case, don't clean up.
- LOG.info("Failed to load committed snapshot, skipping manifest clean-up");
+ LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
}
} catch (RuntimeException e) {
- LOG.info("Failed to load committed table metadata, skipping manifest clean-up", e);
+ LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", e);
}
}