You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/10 21:22:39 UTC
[incubator-iceberg] branch master updated: Add appendManifest to
AppendFiles API (#201)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6ac23e0 Add appendManifest to AppendFiles API (#201)
6ac23e0 is described below
commit 6ac23e03735d4514480a9f0155200faf7179f21b
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Mon Jun 10 14:22:35 2019 -0700
Add appendManifest to AppendFiles API (#201)
This is intended for writers that need to checkpoint state. Writers that checkpoint should be able to create manifest files and append the contents of those manifests to a table, instead of checkpointing
individual data files.
---
.../main/java/org/apache/iceberg/AppendFiles.java | 11 ++
.../main/java/org/apache/iceberg/FastAppend.java | 38 ++++++-
.../java/org/apache/iceberg/ManifestWriter.java | 115 +++++++++++++++------
.../main/java/org/apache/iceberg/MergeAppend.java | 6 ++
.../apache/iceberg/MergingSnapshotProducer.java | 50 +++++++--
.../java/org/apache/iceberg/ReplaceManifests.java | 2 +-
.../java/org/apache/iceberg/TableTestBase.java | 19 ++++
.../java/org/apache/iceberg/TestFastAppend.java | 58 +++++++++++
.../java/org/apache/iceberg/TestMergeAppend.java | 83 +++++++++++++++
9 files changed, 338 insertions(+), 44 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/AppendFiles.java b/api/src/main/java/org/apache/iceberg/AppendFiles.java
index 912b103..a54b835 100644
--- a/api/src/main/java/org/apache/iceberg/AppendFiles.java
+++ b/api/src/main/java/org/apache/iceberg/AppendFiles.java
@@ -36,4 +36,15 @@ public interface AppendFiles extends SnapshotUpdate<AppendFiles> {
* @return this for method chaining
*/
AppendFiles appendFile(DataFile file);
+
+ /**
+ * Append the contents of a manifest to the table.
+ * <p>
+ * The manifest must contain only appended files. All files in the manifest will be appended to
+ * the table in the snapshot created by this update.
+ *
+ * @param file a manifest file
+ * @return this for method chaining
+ */
+ AppendFiles appendManifest(ManifestFile file);
}
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 6bf560c..73b68d4 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
@@ -34,14 +35,18 @@ import org.apache.iceberg.io.OutputFile;
* This implementation will attempt to commit 5 times before throwing {@link CommitFailedException}.
*/
class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
+ private final TableOperations ops;
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
+ private final List<ManifestFile> appendManifests = Lists.newArrayList();
private ManifestFile newManifest = null;
+ private final AtomicInteger manifestCount = new AtomicInteger(0);
private boolean hasNewFiles = false;
FastAppend(TableOperations ops) {
super(ops);
+ this.ops = ops;
this.spec = ops.current().spec();
}
@@ -70,15 +75,34 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
}
@Override
+ public FastAppend appendManifest(ManifestFile manifest) {
+ // the manifest must be rewritten with this update's snapshot ID
+ try (ManifestReader reader = ManifestReader.read(
+ ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
+ OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement());
+ appendManifests.add(ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder));
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
+ }
+
+ return this;
+ }
+
+ @Override
public List<ManifestFile> apply(TableMetadata base) {
List<ManifestFile> newManifests = Lists.newArrayList();
try {
- newManifests.add(writeManifest());
+ ManifestFile manifest = writeManifest();
+ if (manifest != null) {
+ newManifests.add(manifest);
+ }
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write manifest");
}
+ newManifests.addAll(appendManifests);
+
if (base.currentSnapshot() != null) {
newManifests.addAll(base.currentSnapshot().manifests());
}
@@ -88,9 +112,15 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
- if (!committed.contains(newManifest)) {
+ if (newManifest != null && !committed.contains(newManifest)) {
deleteFile(newManifest.path());
}
+
+ for (ManifestFile manifest : appendManifests) {
+ if (!committed.contains(manifest)) {
+ deleteFile(manifest.path());
+ }
+ }
}
private ManifestFile writeManifest() throws IOException {
@@ -99,8 +129,8 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
newManifest = null;
}
- if (newManifest == null) {
- OutputFile out = manifestPath(0);
+ if (newManifest == null && newFiles.size() > 0) {
+ OutputFile out = manifestPath(manifestCount.getAndIncrement());
ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
try {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 402cdcf..0fe7da3 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -31,9 +31,50 @@ import org.slf4j.LoggerFactory;
/**
* Writer for manifest files.
*/
-class ManifestWriter implements FileAppender<DataFile> {
+public class ManifestWriter implements FileAppender<DataFile> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);
+ static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
+ SnapshotSummary.Builder summaryBuilder) {
+ ManifestWriter writer = new ManifestWriter(reader.spec(), outputFile, snapshotId);
+ boolean threw = true;
+ try {
+ for (ManifestEntry entry : reader.entries()) {
+ Preconditions.checkArgument(entry.status() == ManifestEntry.Status.ADDED,
+ "Cannot append manifest: contains existing files");
+ summaryBuilder.addedFile(reader.spec(), entry.file());
+ writer.add(entry);
+ }
+
+ threw = false;
+
+ } finally {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ if (!threw) {
+ throw new RuntimeIOException(e, "Failed to close manifest: %s", outputFile);
+ }
+ }
+ }
+
+ return writer.toManifestFile();
+ }
+
+ /**
+ * Create a new {@link ManifestWriter}.
+ * <p>
+ * Manifests created by this writer are not part of a snapshot and have all entry snapshot IDs
+ * set to -1.
+ *
+ * @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples
+ * @param outputFile the destination file location
+ * @return a manifest writer
+ */
+ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
+ return new ManifestWriter(spec, outputFile, -1);
+ }
+
private final OutputFile file;
private final int specId;
private final FileAppender<ManifestEntry> writer;
@@ -55,33 +96,7 @@ class ManifestWriter implements FileAppender<DataFile> {
this.stats = new PartitionSummary(spec);
}
- public void addExisting(Iterable<ManifestEntry> entries) {
- for (ManifestEntry entry : entries) {
- if (entry.status() != ManifestEntry.Status.DELETED) {
- addExisting(entry);
- }
- }
- }
-
- public void addExisting(ManifestEntry entry) {
- add(reused.wrapExisting(entry.snapshotId(), entry.file()));
- }
-
- public void addExisting(long newSnapshotId, DataFile newFile) {
- add(reused.wrapExisting(newSnapshotId, newFile));
- }
-
- public void delete(ManifestEntry entry) {
- // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
- // when this Snapshot has been removed or when there are no Snapshots older than this one.
- add(reused.wrapDelete(snapshotId, entry.file()));
- }
-
- public void delete(DataFile deletedFile) {
- add(reused.wrapDelete(snapshotId, deletedFile));
- }
-
- public void add(ManifestEntry entry) {
+ void addEntry(ManifestEntry entry) {
switch (entry.status()) {
case ADDED:
addedFiles += 1;
@@ -97,11 +112,53 @@ class ManifestWriter implements FileAppender<DataFile> {
writer.add(entry);
}
+ /**
+ * Add an added entry for a data file.
+ * <p>
+ * The entry's snapshot ID will be this manifest's snapshot ID.
+ *
+ * @param addedFile a data file
+ */
@Override
public void add(DataFile addedFile) {
// TODO: this assumes that file is a GenericDataFile that can be written directly to Avro
// Eventually, this should check in case there are other DataFile implementations.
- add(reused.wrapAppend(snapshotId, addedFile));
+ addEntry(reused.wrapAppend(snapshotId, addedFile));
+ }
+
+ public void add(ManifestEntry entry) {
+ addEntry(reused.wrapAppend(snapshotId, entry.file()));
+ }
+
+ /**
+ * Add an existing entry for a data file.
+ *
+ * @param existingFile a data file
+ * @param fileSnapshotId snapshot ID when the data file was added to the table
+ */
+ public void existing(DataFile existingFile, long fileSnapshotId) {
+ addEntry(reused.wrapExisting(fileSnapshotId, existingFile));
+ }
+
+ void existing(ManifestEntry entry) {
+ addEntry(reused.wrapExisting(entry.snapshotId(), entry.file()));
+ }
+
+ /**
+ * Add a delete entry for a data file.
+ * <p>
+ * The entry's snapshot ID will be this manifest's snapshot ID.
+ *
+ * @param deletedFile a data file
+ */
+ public void delete(DataFile deletedFile) {
+ addEntry(reused.wrapDelete(snapshotId, deletedFile));
+ }
+
+ void delete(ManifestEntry entry) {
+ // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
+ // when this Snapshot has been removed or when there are no Snapshots older than this one.
+ addEntry(reused.wrapDelete(snapshotId, entry.file()));
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java b/core/src/main/java/org/apache/iceberg/MergeAppend.java
index 2646515..e84f45d 100644
--- a/core/src/main/java/org/apache/iceberg/MergeAppend.java
+++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java
@@ -46,4 +46,10 @@ class MergeAppend extends MergingSnapshotProducer<AppendFiles> implements Append
add(file);
return this;
}
+
+ @Override
+ public AppendFiles appendManifest(ManifestFile manifest) {
+ add(manifest);
+ return this;
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index c9a72f3..548aced 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.lang.reflect.Array;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -82,6 +83,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
// update data
private final AtomicInteger manifestCount = new AtomicInteger(0);
private final List<DataFile> newFiles = Lists.newArrayList();
+ private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final Set<CharSequenceWrapper> deletePaths = Sets.newHashSet();
private final Set<StructLikeWrapper> dropPartitions = Sets.newHashSet();
private Expression deleteExpression = Expressions.alwaysFalse();
@@ -179,6 +181,20 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
newFiles.add(file);
}
+ /**
+ * Add all files in a manifest to the new snapshot.
+ */
+ protected void add(ManifestFile manifest) {
+ // the manifest must be rewritten with this update's snapshot ID
+ try (ManifestReader reader = ManifestReader.read(
+ ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
+ appendManifests.add(ManifestWriter.copyAppendManifest(
+ reader, manifestPath(manifestCount.getAndIncrement()), snapshotId(), summaryBuilder));
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
+ }
+ }
+
@Override
protected Map<String, String> summary() {
return summaryBuilder.build();
@@ -216,13 +232,17 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
Set<CharSequenceWrapper> deletedFiles = Sets.newHashSet();
- // group manifests by compatible partition specs to be merged
+ // filter any existing manifests
+ List<ManifestFile> filtered;
if (current != null) {
List<ManifestFile> manifests = current.manifests();
- ManifestFile[] filtered = filterManifests(metricsEvaluator, manifests);
- groupManifestsByPartitionSpec(groups, deletedFiles, filtered);
+ filtered = Arrays.asList(filterManifests(metricsEvaluator, manifests));
+ } else {
+ filtered = ImmutableList.of();
}
+ groupManifestsByPartitionSpec(groups, deletedFiles, Iterables.concat(appendManifests, filtered));
+
List<ManifestFile> manifests = Lists.newArrayList();
for (Map.Entry<Integer, List<ManifestFile>> entry : groups.entrySet()) {
int groupId = entry.getKey();
@@ -261,7 +281,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private void groupManifestsByPartitionSpec(
Map<Integer, List<ManifestFile>> groups,
- Set<CharSequenceWrapper> deletedFiles, ManifestFile[] filtered) {
+ Set<CharSequenceWrapper> deletedFiles, Iterable<ManifestFile> filtered) {
for (ManifestFile manifest : filtered) {
PartitionSpec manifestSpec = ops.current().spec(manifest.partitionSpecId());
Iterable<DataFile> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
@@ -320,14 +340,24 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
}
}
- @Override
- protected void cleanUncommitted(Set<ManifestFile> committed) {
+ private void cleanUncommittedAppends(Set<ManifestFile> committed) {
if (cachedNewManifest != null && !committed.contains(cachedNewManifest)) {
deleteFile(cachedNewManifest.path());
this.cachedNewManifest = null;
}
+
+ for (ManifestFile manifest : appendManifests) {
+ if (!committed.contains(manifest)) {
+ deleteFile(manifest.path());
+ }
+ }
+ }
+
+ @Override
+ protected void cleanUncommitted(Set<ManifestFile> committed) {
cleanUncommittedMerges(committed);
cleanUncommittedFilters(committed);
+ cleanUncommittedAppends(committed);
}
private boolean nothingToFilter() {
@@ -439,7 +469,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
deletedPaths.add(wrapper);
} else {
- writer.addExisting(entry);
+ writer.existing(entry);
}
}
});
@@ -534,14 +564,14 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
// suppress deletes from previous snapshots. only files deleted by this snapshot
// should be added to the new manifest
if (entry.snapshotId() == snapshotId()) {
- writer.add(entry);
+ writer.addEntry(entry);
}
} else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
// adds from this snapshot are still adds, otherwise they should be existing
- writer.add(entry);
+ writer.addEntry(entry);
} else {
// add all files from the old manifest as existing files
- writer.addExisting(entry);
+ writer.existing(entry);
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
index 0077070..73a0505 100644
--- a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
+++ b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
@@ -236,7 +236,7 @@ public class ReplaceManifests extends SnapshotProducer<RewriteManifests> impleme
writer = newWriter();
}
- writer.addExisting(entry);
+ writer.existing(entry);
estimatedSize += len;
}
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 65ce946..6c4f682 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import java.io.File;
+import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
@@ -119,6 +121,23 @@ public class TableTestBase {
return TestTables.readMetadata("test");
}
+ ManifestFile writeManifest(DataFile... files) throws IOException {
+ File manifestFile = temp.newFile("input.m0.avro");
+ Assert.assertTrue(manifestFile.delete());
+ OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
+
+ ManifestWriter writer = ManifestWriter.write(table.spec(), outputFile);
+ try {
+ for (DataFile file : files) {
+ writer.add(file);
+ }
+ } finally {
+ writer.close();
+ }
+
+ return writer.toManifestFile();
+ }
+
void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) {
List<ManifestFile> oldManifests = old != null ? old.manifests() : ImmutableList.of();
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index f036a85..e336f5f 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.iceberg.exceptions.CommitFailedException;
@@ -46,6 +47,45 @@ public class TestFastAppend extends TableTestBase {
}
@Test
+ public void testEmptyTableAppendManifest() throws IOException {
+ Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+ TableMetadata base = readMetadata();
+ Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+ ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+ Snapshot pending = table.newFastAppend()
+ .appendManifest(manifest)
+ .apply();
+
+ validateSnapshot(base.currentSnapshot(), pending, FILE_A, FILE_B);
+ }
+
+ @Test
+ public void testEmptyTableAppendFilesAndManifest() throws IOException {
+ Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+ TableMetadata base = readMetadata();
+ Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+ ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+ Snapshot pending = table.newFastAppend()
+ .appendFile(FILE_C)
+ .appendFile(FILE_D)
+ .appendManifest(manifest)
+ .apply();
+
+ long pendingId = pending.snapshotId();
+
+ validateManifest(pending.manifests().get(0),
+ ids(pendingId, pendingId),
+ files(FILE_C, FILE_D));
+ validateManifest(pending.manifests().get(1),
+ ids(pendingId, pendingId),
+ files(FILE_A, FILE_B));
+ }
+
+ @Test
public void testNonEmptyTableAppend() {
table.newAppend()
.appendFile(FILE_A)
@@ -171,6 +211,24 @@ public class TestFastAppend extends TableTestBase {
}
@Test
+ public void testAppendManifestCleanup() throws IOException {
+ // inject 5 failures
+ TestTables.TestTableOperations ops = table.ops();
+ ops.failCommits(5);
+
+ ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+ AppendFiles append = table.newFastAppend().appendManifest(manifest);
+ Snapshot pending = append.apply();
+ ManifestFile newManifest = pending.manifests().get(0);
+ Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
+
+ AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
+ CommitFailedException.class, "Injected failure", append::commit);
+
+ Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
+ }
+
+ @Test
public void testRecoveryWithManifestList() {
table.updateProperties().set(TableProperties.MANIFEST_LISTS_ENABLED, "true").commit();
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 33b2b09..64d8948 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
+import java.io.IOException;
import java.util.Set;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
@@ -52,6 +53,70 @@ public class TestMergeAppend extends TableTestBase {
}
@Test
+ public void testEmptyTableAppendManifest() throws IOException {
+ Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+ TableMetadata base = readMetadata();
+ Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+ ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+ Snapshot pending = table.newAppend()
+ .appendManifest(manifest)
+ .apply();
+
+ validateSnapshot(base.currentSnapshot(), pending, FILE_A, FILE_B);
+ }
+
+ @Test
+ public void testEmptyTableAppendFilesAndManifest() throws IOException {
+ Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+ TableMetadata base = readMetadata();
+ Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+ ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+ Snapshot pending = table.newAppend()
+ .appendFile(FILE_C)
+ .appendFile(FILE_D)
+ .appendManifest(manifest)
+ .apply();
+
+ long pendingId = pending.snapshotId();
+
+ validateManifest(pending.manifests().get(0),
+ ids(pendingId, pendingId),
+ files(FILE_C, FILE_D));
+ validateManifest(pending.manifests().get(1),
+ ids(pendingId, pendingId),
+ files(FILE_A, FILE_B));
+ }
+
+ @Test
+ public void testMergeWithAppendFilesAndManifest() throws IOException {
+ // merge all manifests for this test
+ table.updateProperties().set("commit.manifest.min-count-to-merge", "1").commit();
+
+ Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+ TableMetadata base = readMetadata();
+ Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+ ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+ Snapshot pending = table.newAppend()
+ .appendFile(FILE_C)
+ .appendFile(FILE_D)
+ .appendManifest(manifest)
+ .apply();
+
+ long pendingId = pending.snapshotId();
+
+ Assert.assertEquals("Should create 1 merged manifest", 1, pending.manifests().size());
+ validateManifest(pending.manifests().get(0),
+ ids(pendingId, pendingId, pendingId, pendingId),
+ files(FILE_C, FILE_D, FILE_A, FILE_B));
+ }
+
+ @Test
public void testMergeWithExistingManifest() {
// merge all manifests for this test
table.updateProperties().set("commit.manifest.min-count-to-merge", "1").commit();
@@ -341,6 +406,24 @@ public class TestMergeAppend extends TableTestBase {
}
@Test
+ public void testAppendManifestCleanup() throws IOException {
+ // inject 5 failures
+ TestTables.TestTableOperations ops = table.ops();
+ ops.failCommits(5);
+
+ ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+ AppendFiles append = table.newAppend().appendManifest(manifest);
+ Snapshot pending = append.apply();
+ ManifestFile newManifest = pending.manifests().get(0);
+ Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
+
+ AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
+ CommitFailedException.class, "Injected failure", append::commit);
+
+ Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
+ }
+
+ @Test
public void testRecovery() {
// merge all manifests for this test
table.updateProperties().set("commit.manifest.min-count-to-merge", "1").commit();