You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/07 00:54:51 UTC

[incubator-iceberg] branch master updated: Fix transaction cleanup (#352)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d8ca485  Fix transaction cleanup (#352)
d8ca485 is described below

commit d8ca485d50a64ad6209deae93948c4e75f139e76
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Aug 6 17:54:46 2019 -0700

    Fix transaction cleanup (#352)
---
 .../java/org/apache/iceberg/BaseTransaction.java   | 102 ++++++++++++++++-----
 .../java/org/apache/iceberg/SnapshotProducer.java  |   4 +-
 2 files changed, 82 insertions(+), 24 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 1353bb7..31f6692 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -26,12 +26,16 @@ import com.google.common.collect.Sets;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.util.Exceptions;
 import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
 import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
@@ -43,6 +47,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
 import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
 
 class BaseTransaction implements Transaction {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseTransaction.class);
+
   private enum TransactionType {
     CREATE_TABLE,
     REPLACE_TABLE,
@@ -229,35 +235,87 @@ class BaseTransaction implements Transaction {
           return;
         }
 
-        Tasks.foreach(ops)
-            .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-            .exponentialBackoff(
-                base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-                base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-                base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-                2.0 /* exponential */)
-            .onlyRetryOn(CommitFailedException.class)
-            .run(underlyingOps -> {
-              if (base != underlyingOps.refresh()) {
-                this.base = underlyingOps.current(); // just refreshed
-                this.current = base;
-                this.deletedFiles.clear(); // clear deletes from the last set of operation commits
-                for (PendingUpdate update : updates) {
-                  // re-commit each update in the chain to apply it and update current
-                  update.commit();
+        // this is always set to the latest commit attempt's snapshot id.
+        AtomicLong currentSnapshotId = new AtomicLong(-1L);
+
+        try {
+          Tasks.foreach(ops)
+              .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+              .exponentialBackoff(
+                  base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+                  base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+                  base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+                  2.0 /* exponential */)
+              .onlyRetryOn(CommitFailedException.class)
+              .run(underlyingOps -> {
+                if (base != underlyingOps.refresh()) {
+                  this.base = underlyingOps.current(); // just refreshed
+                  this.current = base;
+                  for (PendingUpdate update : updates) {
+                    // re-commit each update in the chain to apply it and update current
+                    update.commit();
+                  }
                 }
-              }
 
-              // fix up the snapshot log, which should not contain intermediate snapshots
-              underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
-            });
+                currentSnapshotId.set(current.currentSnapshot().snapshotId());
+
+                // fix up the snapshot log, which should not contain intermediate snapshots
+                underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
+              });
+
+        } catch (RuntimeException e) {
+          // the commit failed and there are no committed manifests. delete any file cleaned up by an operation.
+          Exceptions.suppressAndThrow(e, () -> deletedFiles.forEach(ops.io()::deleteFile));
+        }
+
+        // the commit succeeded
+
+        try {
+          intermediateSnapshotIds.add(currentSnapshotId.get());
+
+          // clean up the data files that were deleted by each operation. first, get the list of committed manifests to
+          // ensure that no committed manifest is deleted. a manifest could be deleted in one successful operation
+          // commit, but reused in another successful commit of that operation if the whole transaction is retried.
+          Set<String> committedFiles = committedFiles(ops, intermediateSnapshotIds);
+          if (committedFiles != null) {
+            // delete all of the files that were deleted in the most recent set of operation commits
+            Tasks.foreach(deletedFiles)
+                .suppressFailureWhenFinished()
+                .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
+                .run(path -> {
+                  if (!committedFiles.contains(path)) {
+                    ops.io().deleteFile(path);
+                  }
+                });
+          } else {
+            LOG.warn("Failed to load metadata for a committed snapshot, skipping clean-up");
+          }
+
+        } catch (RuntimeException e) {
+          LOG.warn("Failed to load committed metadata, skipping clean-up", e);
+        }
 
-        // delete all of the files that were deleted in the most recent set of operation commits
-        deletedFiles.forEach(ops.io()::deleteFile);
         break;
     }
   }
 
+  private static Set<String> committedFiles(TableOperations ops, Set<Long> intermediateSnapshotIds) {
+    Set<String> committedFiles = Sets.newHashSet();
+
+    for (long snapshotId : intermediateSnapshotIds) {
+      Snapshot snap = ops.current().snapshot(snapshotId);
+      if (snap != null) {
+        committedFiles.add(snap.manifestListLocation());
+        snap.manifests()
+            .forEach(manifest -> committedFiles.add(manifest.path()));
+      } else {
+        return null;
+      }
+    }
+
+    return committedFiles;
+  }
+
   private static Long currentId(TableMetadata meta) {
     if (meta != null) {
       if (meta.currentSnapshot() != null) {
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 83f15b5..37097e0 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -270,11 +270,11 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
       } else {
         // saved may not be present if the latest metadata couldn't be loaded due to eventual
         // consistency problems in refresh. in that case, don't clean up.
-        LOG.info("Failed to load committed snapshot, skipping manifest clean-up");
+        LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
       }
 
     } catch (RuntimeException e) {
-      LOG.info("Failed to load committed table metadata, skipping manifest clean-up", e);
+      LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", e);
     }
   }