You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/06/23 16:52:15 UTC
[incubator-iceberg] branch master updated: Fix uncommitted file
clean-up in transactions (#218)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 97485a0 Fix uncommitted file clean-up in transactions (#218)
97485a0 is described below
commit 97485a098eaf2e518366495884105ab9d8c0992c
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Sun Jun 23 09:52:11 2019 -0700
Fix uncommitted file clean-up in transactions (#218)
* Fix uncommitted file clean-up in transactions.
This adds a callback to delete files and adds a callback in Transaction
that keeps track of deletes instead of running them. When a transaction
is committed, the last set of deletes are run so that the actual deletes
are for the last commit of each operation in the transaction.
* Update for baseline and fix problems porting to OSS.
---
.../java/org/apache/iceberg/SnapshotUpdate.java | 10 +++
.../java/org/apache/iceberg/BaseTransaction.java | 20 ++++-
.../main/java/org/apache/iceberg/FastAppend.java | 5 ++
.../apache/iceberg/MergingSnapshotProducer.java | 2 -
.../java/org/apache/iceberg/ReplaceManifests.java | 5 ++
.../java/org/apache/iceberg/SnapshotProducer.java | 28 ++++++-
.../java/org/apache/iceberg/TestTransaction.java | 87 ++++++++++++++++++++++
7 files changed, 151 insertions(+), 6 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index fcead03..5cabc02 100644
--- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -19,6 +19,8 @@
package org.apache.iceberg;
+import java.util.function.Consumer;
+
/**
* API for table changes that produce snapshots. This interface contains common methods for all
* updates that create a new table {@link Snapshot}.
@@ -35,4 +37,12 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
*/
ThisT set(String property, String value);
+ /**
+ * Set a callback to delete files instead of the table's default.
+ *
+ * @param deleteFunc a String consumer used to delete locations.
+ * @return this for method chaining
+ */
+ ThisT deleteWith(Consumer<String> deleteFunc);
+
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index be91e0a..d21b728 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Consumer;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileIO;
@@ -62,12 +63,13 @@ class BaseTransaction implements Transaction {
return new BaseTransaction(ops, ops.refresh());
}
- // exposed for testing
private final TableOperations ops;
private final TransactionTable transactionTable;
private final TableOperations transactionOps;
private final List<PendingUpdate> updates;
private final Set<Long> intermediateSnapshotIds;
+ private final Set<String> deletedFiles = Sets.newHashSet(); // keep track of files deleted in the most recent commit
+ private final Consumer<String> enqueueDelete = deletedFiles::add;
private TransactionType type;
private TableMetadata base;
private TableMetadata lastBase;
@@ -130,6 +132,7 @@ class BaseTransaction implements Transaction {
public AppendFiles newAppend() {
checkLastOperationCommitted("AppendFiles");
AppendFiles append = new MergeAppend(transactionOps);
+ append.deleteWith(enqueueDelete);
updates.add(append);
return append;
}
@@ -146,6 +149,7 @@ class BaseTransaction implements Transaction {
public RewriteFiles newRewrite() {
checkLastOperationCommitted("RewriteFiles");
RewriteFiles rewrite = new ReplaceFiles(transactionOps);
+ rewrite.deleteWith(enqueueDelete);
updates.add(rewrite);
return rewrite;
}
@@ -154,6 +158,7 @@ class BaseTransaction implements Transaction {
public RewriteManifests rewriteManifests() {
checkLastOperationCommitted("RewriteManifests");
RewriteManifests rewrite = new ReplaceManifests(transactionOps);
+ rewrite.deleteWith(enqueueDelete);
updates.add(rewrite);
return rewrite;
}
@@ -162,6 +167,7 @@ class BaseTransaction implements Transaction {
public OverwriteFiles newOverwrite() {
checkLastOperationCommitted("OverwriteFiles");
OverwriteFiles overwrite = new OverwriteData(transactionOps);
+ overwrite.deleteWith(enqueueDelete);
updates.add(overwrite);
return overwrite;
}
@@ -170,6 +176,7 @@ class BaseTransaction implements Transaction {
public ReplacePartitions newReplacePartitions() {
checkLastOperationCommitted("ReplacePartitions");
ReplacePartitionsOperation replacePartitions = new ReplacePartitionsOperation(transactionOps);
+ replacePartitions.deleteWith(enqueueDelete);
updates.add(replacePartitions);
return replacePartitions;
}
@@ -178,6 +185,7 @@ class BaseTransaction implements Transaction {
public DeleteFiles newDelete() {
checkLastOperationCommitted("DeleteFiles");
DeleteFiles delete = new StreamingDelete(transactionOps);
+ delete.deleteWith(enqueueDelete);
updates.add(delete);
return delete;
}
@@ -186,6 +194,7 @@ class BaseTransaction implements Transaction {
public ExpireSnapshots expireSnapshots() {
checkLastOperationCommitted("ExpireSnapshots");
ExpireSnapshots expire = new RemoveSnapshots(transactionOps);
+ expire.deleteWith(enqueueDelete);
updates.add(expire);
return expire;
}
@@ -246,6 +255,7 @@ class BaseTransaction implements Transaction {
if (base != underlyingOps.refresh()) {
this.base = underlyingOps.current(); // just refreshed
this.current = base;
+ this.deletedFiles.clear(); // clear deletes from the last set of operation commits
for (PendingUpdate update : updates) {
// re-commit each update in the chain to apply it and update current
update.commit();
@@ -255,6 +265,9 @@ class BaseTransaction implements Transaction {
// fix up the snapshot log, which should not contain intermediate snapshots
underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
});
+
+ // delete all of the files that were deleted in the most recent set of operation commits
+ deletedFiles.forEach(ops.io()::deleteFile);
break;
}
}
@@ -452,4 +465,9 @@ class BaseTransaction implements Transaction {
TableOperations ops() {
return ops;
}
+
+ @VisibleForTesting
+ Set<String> deletedFiles() {
+ return deletedFiles;
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 73b68d4..e4fce79 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -51,6 +51,11 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
}
@Override
+ protected AppendFiles self() {
+ return this;
+ }
+
+ @Override
public AppendFiles set(String property, String value) {
summaryBuilder.set(property, value);
return this;
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 548aced..823b150 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -116,8 +116,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
.propertyAsInt(MANIFEST_MIN_MERGE_COUNT, MANIFEST_MIN_MERGE_COUNT_DEFAULT);
}
- protected abstract ThisT self();
-
@Override
public ThisT set(String property, String value) {
summaryBuilder.set(property, value);
diff --git a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
index b43cfc6..771bd0d 100644
--- a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
+++ b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
@@ -74,6 +74,11 @@ public class ReplaceManifests extends SnapshotProducer<RewriteManifests> impleme
}
@Override
+ protected RewriteManifests self() {
+ return this;
+ }
+
+ @Override
protected String operation() {
return DataOperations.REPLACE;
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index ed5a508..109fbd9 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -32,6 +33,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
@@ -57,6 +59,16 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();
/**
+ * Default callback used to delete files.
+ */
+ private final Consumer<String> defaultDelete = new Consumer<String>() {
+ @Override
+ public void accept(String file) {
+ ops.io().deleteFile(file);
+ }
+ };
+
+ /**
* Cache used to enrich ManifestFile instances that are written to a ManifestListWriter.
*/
private final LoadingCache<ManifestFile, ManifestFile> manifestsWithMetadata;
@@ -67,6 +79,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
private final List<String> manifestLists = Lists.newArrayList();
private Long snapshotId = null;
private TableMetadata base = null;
+ private Consumer<String> deleteFunc = defaultDelete;
protected SnapshotProducer(TableOperations ops) {
this.ops = ops;
@@ -81,6 +94,15 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
});
}
+ protected abstract ThisT self();
+
+ @Override
+ public ThisT deleteWith(Consumer<String> deleteCallback) {
+ Preconditions.checkArgument(this.deleteFunc == defaultDelete, "Cannot set delete callback more than once");
+ this.deleteFunc = deleteCallback;
+ return self();
+ }
+
/**
* Clean up any uncommitted manifests that were created.
* <p>
@@ -227,7 +249,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if (!saved.manifestListLocation().equals(manifestList)) {
- ops.io().deleteFile(manifestList);
+ deleteFile(manifestList);
}
}
} else {
@@ -243,14 +265,14 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
protected void cleanAll() {
for (String manifestList : manifestLists) {
- ops.io().deleteFile(manifestList);
+ deleteFile(manifestList);
}
manifestLists.clear();
cleanUncommitted(EMPTY_SET);
}
protected void deleteFile(String path) {
- ops.io().deleteFile(path);
+ deleteFunc.accept(path);
}
protected OutputFile manifestListPath() {
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 8fa8b77..a920f28 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -19,12 +19,15 @@
package org.apache.iceberg;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.File;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.io.OutputFile;
import org.junit.Assert;
import org.junit.Test;
@@ -461,6 +464,90 @@ public class TestTransaction extends TableTestBase {
}
@Test
+ public void testTransactionRetryAndAppendManifests() throws Exception {
+ // use only one retry and aggressively merge manifests
+ table.updateProperties()
+ .set(TableProperties.COMMIT_NUM_RETRIES, "1")
+ .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
+ .commit();
+
+ Assert.assertEquals("Table should be on version 1", 1, (int) version());
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Assert.assertEquals("Table should be on version 2 after append", 2, (int) version());
+ Assert.assertEquals("Append should create one manifest", 1, table.currentSnapshot().manifests().size());
+ ManifestFile v1manifest = table.currentSnapshot().manifests().get(0);
+
+ TableMetadata base = readMetadata();
+
+ // create a manifest append
+ OutputFile manifestLocation = Files.localOutput("/tmp/" + UUID.randomUUID().toString() + ".avro");
+ ManifestWriter writer = ManifestWriter.write(table.spec(), manifestLocation);
+ try {
+ writer.add(FILE_D);
+ } finally {
+ writer.close();
+ }
+
+ Transaction txn = table.newTransaction();
+
+ txn.newAppend()
+ .appendManifest(writer.toManifestFile())
+ .commit();
+
+ Assert.assertSame("Base metadata should not change when commit is created", base, readMetadata());
+ Assert.assertEquals("Table should be on version 2 after txn create", 2, (int) version());
+
+ Assert.assertEquals("Append should have one merged manifest", 1, txn.table().currentSnapshot().manifests().size());
+ ManifestFile mergedManifest = txn.table().currentSnapshot().manifests().get(0);
+
+ // find the initial copy of the appended manifest
+ String copiedAppendManifest = Iterables.getOnlyElement(Iterables.filter(
+ Iterables.transform(listManifestFiles(), File::getPath),
+ path -> !v1manifest.path().contains(path) && !mergedManifest.path().contains(path)));
+
+ Assert.assertTrue("Transaction should hijack the delete of the original copied manifest",
+ ((BaseTransaction) txn).deletedFiles().contains(copiedAppendManifest));
+ Assert.assertTrue("Copied append manifest should not be deleted yet", new File(copiedAppendManifest).exists());
+
+ // cause the transaction commit to fail and retry
+ table.newAppend()
+ .appendFile(FILE_C)
+ .commit();
+
+ Assert.assertEquals("Table should be on version 3 after real append", 3, (int) version());
+
+ txn.commitTransaction();
+
+ Assert.assertEquals("Table should be on version 4 after commit", 4, (int) version());
+
+ Assert.assertTrue("Transaction should hijack the delete of the original copied manifest",
+ ((BaseTransaction) txn).deletedFiles().contains(copiedAppendManifest));
+ Assert.assertFalse("Append manifest should be deleted", new File(copiedAppendManifest).exists());
+ Assert.assertTrue("Transaction should hijack the delete of the first merged manifest",
+ ((BaseTransaction) txn).deletedFiles().contains(mergedManifest.path()));
+ Assert.assertFalse("Append manifest should be deleted", new File(mergedManifest.path()).exists());
+
+ Assert.assertEquals("Should merge all commit manifests into a single manifest",
+ 1, table.currentSnapshot().manifests().size());
+ }
+
+ @Test
+ public void testTransactionNoCustomDeleteFunc() {
+ AssertHelpers.assertThrows("Should fail setting a custom delete function with a transaction",
+ IllegalArgumentException.class, "Cannot set delete callback more than once",
+ () -> table.newTransaction()
+ .newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .deleteWith(file -> { }));
+ }
+
+ @Test
public void testTransactionFastAppends() {
table.updateProperties()
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")