You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/06/23 16:52:15 UTC

[incubator-iceberg] branch master updated: Fix uncommitted file clean-up in transactions (#218)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 97485a0  Fix uncommitted file clean-up in transactions (#218)
97485a0 is described below

commit 97485a098eaf2e518366495884105ab9d8c0992c
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Sun Jun 23 09:52:11 2019 -0700

    Fix uncommitted file clean-up in transactions (#218)
    
    * Fix uncommitted file clean-up in transactions.
    
    This adds a callback to delete files and adds a callback in Transaction
    that keeps track of deletes instead of running them. When a transaction
    is committed, the last set of deletes are run so that the actual deletes
    are for the last commit of each operation in the transaction.
    
    * Update for baseline and fix problems porting to OSS.
---
 .../java/org/apache/iceberg/SnapshotUpdate.java    | 10 +++
 .../java/org/apache/iceberg/BaseTransaction.java   | 20 ++++-
 .../main/java/org/apache/iceberg/FastAppend.java   |  5 ++
 .../apache/iceberg/MergingSnapshotProducer.java    |  2 -
 .../java/org/apache/iceberg/ReplaceManifests.java  |  5 ++
 .../java/org/apache/iceberg/SnapshotProducer.java  | 28 ++++++-
 .../java/org/apache/iceberg/TestTransaction.java   | 87 ++++++++++++++++++++++
 7 files changed, 151 insertions(+), 6 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index fcead03..5cabc02 100644
--- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg;
 
+import java.util.function.Consumer;
+
 /**
  * API for table changes that produce snapshots. This interface contains common methods for all
  * updates that create a new table {@link Snapshot}.
@@ -35,4 +37,12 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
    */
   ThisT set(String property, String value);
 
+  /**
+   * Set a callback to delete files instead of the table's default.
+   *
+   * @param deleteFunc a String consumer used to delete locations.
+   * @return this for method chaining
+   */
+  ThisT deleteWith(Consumer<String> deleteFunc);
+
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index be91e0a..d21b728 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Consumer;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.io.FileIO;
@@ -62,12 +63,13 @@ class BaseTransaction implements Transaction {
     return new BaseTransaction(ops, ops.refresh());
   }
 
-  // exposed for testing
   private final TableOperations ops;
   private final TransactionTable transactionTable;
   private final TableOperations transactionOps;
   private final List<PendingUpdate> updates;
   private final Set<Long> intermediateSnapshotIds;
+  private final Set<String> deletedFiles = Sets.newHashSet(); // keep track of files deleted in the most recent commit
+  private final Consumer<String> enqueueDelete = deletedFiles::add;
   private TransactionType type;
   private TableMetadata base;
   private TableMetadata lastBase;
@@ -130,6 +132,7 @@ class BaseTransaction implements Transaction {
   public AppendFiles newAppend() {
     checkLastOperationCommitted("AppendFiles");
     AppendFiles append = new MergeAppend(transactionOps);
+    append.deleteWith(enqueueDelete);
     updates.add(append);
     return append;
   }
@@ -146,6 +149,7 @@ class BaseTransaction implements Transaction {
   public RewriteFiles newRewrite() {
     checkLastOperationCommitted("RewriteFiles");
     RewriteFiles rewrite = new ReplaceFiles(transactionOps);
+    rewrite.deleteWith(enqueueDelete);
     updates.add(rewrite);
     return rewrite;
   }
@@ -154,6 +158,7 @@ class BaseTransaction implements Transaction {
   public RewriteManifests rewriteManifests() {
     checkLastOperationCommitted("RewriteManifests");
     RewriteManifests rewrite = new ReplaceManifests(transactionOps);
+    rewrite.deleteWith(enqueueDelete);
     updates.add(rewrite);
     return rewrite;
   }
@@ -162,6 +167,7 @@ class BaseTransaction implements Transaction {
   public OverwriteFiles newOverwrite() {
     checkLastOperationCommitted("OverwriteFiles");
     OverwriteFiles overwrite = new OverwriteData(transactionOps);
+    overwrite.deleteWith(enqueueDelete);
     updates.add(overwrite);
     return overwrite;
   }
@@ -170,6 +176,7 @@ class BaseTransaction implements Transaction {
   public ReplacePartitions newReplacePartitions() {
     checkLastOperationCommitted("ReplacePartitions");
     ReplacePartitionsOperation replacePartitions = new ReplacePartitionsOperation(transactionOps);
+    replacePartitions.deleteWith(enqueueDelete);
     updates.add(replacePartitions);
     return replacePartitions;
   }
@@ -178,6 +185,7 @@ class BaseTransaction implements Transaction {
   public DeleteFiles newDelete() {
     checkLastOperationCommitted("DeleteFiles");
     DeleteFiles delete = new StreamingDelete(transactionOps);
+    delete.deleteWith(enqueueDelete);
     updates.add(delete);
     return delete;
   }
@@ -186,6 +194,7 @@ class BaseTransaction implements Transaction {
   public ExpireSnapshots expireSnapshots() {
     checkLastOperationCommitted("ExpireSnapshots");
     ExpireSnapshots expire = new RemoveSnapshots(transactionOps);
+    expire.deleteWith(enqueueDelete);
     updates.add(expire);
     return expire;
   }
@@ -246,6 +255,7 @@ class BaseTransaction implements Transaction {
               if (base != underlyingOps.refresh()) {
                 this.base = underlyingOps.current(); // just refreshed
                 this.current = base;
+                this.deletedFiles.clear(); // clear deletes from the last set of operation commits
                 for (PendingUpdate update : updates) {
                   // re-commit each update in the chain to apply it and update current
                   update.commit();
@@ -255,6 +265,9 @@ class BaseTransaction implements Transaction {
               // fix up the snapshot log, which should not contain intermediate snapshots
               underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
             });
+
+        // delete all of the files that were deleted in the most recent set of operation commits
+        deletedFiles.forEach(ops.io()::deleteFile);
         break;
     }
   }
@@ -452,4 +465,9 @@ class BaseTransaction implements Transaction {
   TableOperations ops() {
     return ops;
   }
+
+  @VisibleForTesting
+  Set<String> deletedFiles() {
+    return deletedFiles;
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 73b68d4..e4fce79 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -51,6 +51,11 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
   }
 
   @Override
+  protected AppendFiles self() {
+    return this;
+  }
+
+  @Override
   public AppendFiles set(String property, String value) {
     summaryBuilder.set(property, value);
     return this;
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 548aced..823b150 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -116,8 +116,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
         .propertyAsInt(MANIFEST_MIN_MERGE_COUNT, MANIFEST_MIN_MERGE_COUNT_DEFAULT);
   }
 
-  protected abstract ThisT self();
-
   @Override
   public ThisT set(String property, String value) {
     summaryBuilder.set(property, value);
diff --git a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
index b43cfc6..771bd0d 100644
--- a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
+++ b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
@@ -74,6 +74,11 @@ public class ReplaceManifests extends SnapshotProducer<RewriteManifests> impleme
   }
 
   @Override
+  protected RewriteManifests self() {
+    return this;
+  }
+
+  @Override
   protected String operation() {
     return DataOperations.REPLACE;
   }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index ed5a508..109fbd9 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -32,6 +33,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.OutputFile;
@@ -57,6 +59,16 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
   static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();
 
   /**
+   * Default callback used to delete files.
+   */
+  private final Consumer<String> defaultDelete = new Consumer<String>() {
+    @Override
+    public void accept(String file) {
+      ops.io().deleteFile(file);
+    }
+  };
+
+  /**
    * Cache used to enrich ManifestFile instances that are written to a ManifestListWriter.
    */
   private final LoadingCache<ManifestFile, ManifestFile> manifestsWithMetadata;
@@ -67,6 +79,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
   private final List<String> manifestLists = Lists.newArrayList();
   private Long snapshotId = null;
   private TableMetadata base = null;
+  private Consumer<String> deleteFunc = defaultDelete;
 
   protected SnapshotProducer(TableOperations ops) {
     this.ops = ops;
@@ -81,6 +94,15 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
       });
   }
 
+  protected abstract ThisT self();
+
+  @Override
+  public ThisT deleteWith(Consumer<String> deleteCallback) {
+    Preconditions.checkArgument(this.deleteFunc == defaultDelete, "Cannot set delete callback more than once");
+    this.deleteFunc = deleteCallback;
+    return self();
+  }
+
   /**
    * Clean up any uncommitted manifests that were created.
    * <p>
@@ -227,7 +249,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
         // also clean up unused manifest lists created by multiple attempts
         for (String manifestList : manifestLists) {
           if (!saved.manifestListLocation().equals(manifestList)) {
-            ops.io().deleteFile(manifestList);
+            deleteFile(manifestList);
           }
         }
       } else {
@@ -243,14 +265,14 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
 
   protected void cleanAll() {
     for (String manifestList : manifestLists) {
-      ops.io().deleteFile(manifestList);
+      deleteFile(manifestList);
     }
     manifestLists.clear();
     cleanUncommitted(EMPTY_SET);
   }
 
   protected void deleteFile(String path) {
-    ops.io().deleteFile(path);
+    deleteFunc.accept(path);
   }
 
   protected OutputFile manifestListPath() {
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 8fa8b77..a920f28 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -19,12 +19,15 @@
 
 package org.apache.iceberg;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.io.OutputFile;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -461,6 +464,90 @@ public class TestTransaction extends TableTestBase {
   }
 
   @Test
+  public void testTransactionRetryAndAppendManifests() throws Exception {
+    // use only one retry and aggressively merge manifests
+    table.updateProperties()
+        .set(TableProperties.COMMIT_NUM_RETRIES, "1")
+        .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
+        .commit();
+
+    Assert.assertEquals("Table should be on version 1", 1, (int) version());
+
+    table.newAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Assert.assertEquals("Table should be on version 2 after append", 2, (int) version());
+    Assert.assertEquals("Append should create one manifest", 1, table.currentSnapshot().manifests().size());
+    ManifestFile v1manifest = table.currentSnapshot().manifests().get(0);
+
+    TableMetadata base = readMetadata();
+
+    // create a manifest append
+    OutputFile manifestLocation = Files.localOutput("/tmp/" + UUID.randomUUID().toString() + ".avro");
+    ManifestWriter writer = ManifestWriter.write(table.spec(), manifestLocation);
+    try {
+      writer.add(FILE_D);
+    } finally {
+      writer.close();
+    }
+
+    Transaction txn = table.newTransaction();
+
+    txn.newAppend()
+        .appendManifest(writer.toManifestFile())
+        .commit();
+
+    Assert.assertSame("Base metadata should not change when commit is created", base, readMetadata());
+    Assert.assertEquals("Table should be on version 2 after txn create", 2, (int) version());
+
+    Assert.assertEquals("Append should have one merged manifest", 1, txn.table().currentSnapshot().manifests().size());
+    ManifestFile mergedManifest = txn.table().currentSnapshot().manifests().get(0);
+
+    // find the initial copy of the appended manifest
+    String copiedAppendManifest = Iterables.getOnlyElement(Iterables.filter(
+        Iterables.transform(listManifestFiles(), File::getPath),
+        path -> !v1manifest.path().contains(path) && !mergedManifest.path().contains(path)));
+
+    Assert.assertTrue("Transaction should hijack the delete of the original copied manifest",
+        ((BaseTransaction) txn).deletedFiles().contains(copiedAppendManifest));
+    Assert.assertTrue("Copied append manifest should not be deleted yet", new File(copiedAppendManifest).exists());
+
+    // cause the transaction commit to fail and retry
+    table.newAppend()
+        .appendFile(FILE_C)
+        .commit();
+
+    Assert.assertEquals("Table should be on version 3 after real append", 3, (int) version());
+
+    txn.commitTransaction();
+
+    Assert.assertEquals("Table should be on version 4 after commit", 4, (int) version());
+
+    Assert.assertTrue("Transaction should hijack the delete of the original copied manifest",
+        ((BaseTransaction) txn).deletedFiles().contains(copiedAppendManifest));
+    Assert.assertFalse("Append manifest should be deleted", new File(copiedAppendManifest).exists());
+    Assert.assertTrue("Transaction should hijack the delete of the first merged manifest",
+        ((BaseTransaction) txn).deletedFiles().contains(mergedManifest.path()));
+    Assert.assertFalse("Append manifest should be deleted", new File(mergedManifest.path()).exists());
+
+    Assert.assertEquals("Should merge all commit manifests into a single manifest",
+        1, table.currentSnapshot().manifests().size());
+  }
+
+  @Test
+  public void testTransactionNoCustomDeleteFunc() {
+    AssertHelpers.assertThrows("Should fail setting a custom delete function with a transaction",
+        IllegalArgumentException.class, "Cannot set delete callback more than once",
+        () -> table.newTransaction()
+            .newAppend()
+            .appendFile(FILE_A)
+            .appendFile(FILE_B)
+            .deleteWith(file -> { }));
+  }
+
+  @Test
   public void testTransactionFastAppends() {
     table.updateProperties()
         .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")