You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/10 21:22:39 UTC

[incubator-iceberg] branch master updated: Add appendManifest to AppendFiles API (#201)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ac23e0  Add appendManifest to AppendFiles API (#201)
6ac23e0 is described below

commit 6ac23e03735d4514480a9f0155200faf7179f21b
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Mon Jun 10 14:22:35 2019 -0700

    Add appendManifest to AppendFiles API (#201)
    
    This is intended for writers that need to checkpoint state. Writers that checkpoint should be able to create manifest files and append the contents of those manifests to a table, instead of checkpointing
    individual data files.
---
 .../main/java/org/apache/iceberg/AppendFiles.java  |  11 ++
 .../main/java/org/apache/iceberg/FastAppend.java   |  38 ++++++-
 .../java/org/apache/iceberg/ManifestWriter.java    | 115 +++++++++++++++------
 .../main/java/org/apache/iceberg/MergeAppend.java  |   6 ++
 .../apache/iceberg/MergingSnapshotProducer.java    |  50 +++++++--
 .../java/org/apache/iceberg/ReplaceManifests.java  |   2 +-
 .../java/org/apache/iceberg/TableTestBase.java     |  19 ++++
 .../java/org/apache/iceberg/TestFastAppend.java    |  58 +++++++++++
 .../java/org/apache/iceberg/TestMergeAppend.java   |  83 +++++++++++++++
 9 files changed, 338 insertions(+), 44 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/AppendFiles.java b/api/src/main/java/org/apache/iceberg/AppendFiles.java
index 912b103..a54b835 100644
--- a/api/src/main/java/org/apache/iceberg/AppendFiles.java
+++ b/api/src/main/java/org/apache/iceberg/AppendFiles.java
@@ -36,4 +36,15 @@ public interface AppendFiles extends SnapshotUpdate<AppendFiles> {
    * @return this for method chaining
    */
   AppendFiles appendFile(DataFile file);
+
+  /**
+   * Append the contents of a manifest to the table.
+   * <p>
+   * The manifest must contain only appended files. All files in the manifest will be appended to
+   * the table in the snapshot created by this update.
+   *
+   * @param file a manifest file
+   * @return this for method chaining
+   */
+  AppendFiles appendManifest(ManifestFile file);
 }
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 6bf560c..73b68d4 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.OutputFile;
@@ -34,14 +35,18 @@ import org.apache.iceberg.io.OutputFile;
  * This implementation will attempt to commit 5 times before throwing {@link CommitFailedException}.
  */
 class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
+  private final TableOperations ops;
   private final PartitionSpec spec;
   private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
   private final List<DataFile> newFiles = Lists.newArrayList();
+  private final List<ManifestFile> appendManifests = Lists.newArrayList();
   private ManifestFile newManifest = null;
+  private final AtomicInteger manifestCount = new AtomicInteger(0);
   private boolean hasNewFiles = false;
 
   FastAppend(TableOperations ops) {
     super(ops);
+    this.ops = ops;
     this.spec = ops.current().spec();
   }
 
@@ -70,15 +75,34 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
   }
 
   @Override
+  public FastAppend appendManifest(ManifestFile manifest) {
+    // the manifest must be rewritten with this update's snapshot ID
+    try (ManifestReader reader = ManifestReader.read(
+        ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
+      OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement());
+      appendManifests.add(ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
+    }
+
+    return this;
+  }
+
+  @Override
   public List<ManifestFile> apply(TableMetadata base) {
     List<ManifestFile> newManifests = Lists.newArrayList();
 
     try {
-      newManifests.add(writeManifest());
+      ManifestFile manifest = writeManifest();
+      if (manifest != null) {
+        newManifests.add(manifest);
+      }
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to write manifest");
     }
 
+    newManifests.addAll(appendManifests);
+
     if (base.currentSnapshot() != null) {
       newManifests.addAll(base.currentSnapshot().manifests());
     }
@@ -88,9 +112,15 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
 
   @Override
   protected void cleanUncommitted(Set<ManifestFile> committed) {
-    if (!committed.contains(newManifest)) {
+    if (newManifest != null && !committed.contains(newManifest)) {
       deleteFile(newManifest.path());
     }
+
+    for (ManifestFile manifest : appendManifests) {
+      if (!committed.contains(manifest)) {
+        deleteFile(manifest.path());
+      }
+    }
   }
 
   private ManifestFile writeManifest() throws IOException {
@@ -99,8 +129,8 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
       newManifest = null;
     }
 
-    if (newManifest == null) {
-      OutputFile out = manifestPath(0);
+    if (newManifest == null && newFiles.size() > 0) {
+      OutputFile out = manifestPath(manifestCount.getAndIncrement());
 
       ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
       try {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 402cdcf..0fe7da3 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -31,9 +31,50 @@ import org.slf4j.LoggerFactory;
 /**
  * Writer for manifest files.
  */
-class ManifestWriter implements FileAppender<DataFile> {
+public class ManifestWriter implements FileAppender<DataFile> {
   private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);
 
+  static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
+                                         SnapshotSummary.Builder summaryBuilder) {
+    ManifestWriter writer = new ManifestWriter(reader.spec(), outputFile, snapshotId);
+    boolean threw = true;
+    try {
+      for (ManifestEntry entry : reader.entries()) {
+        Preconditions.checkArgument(entry.status() == ManifestEntry.Status.ADDED,
+            "Cannot append manifest: contains existing files");
+        summaryBuilder.addedFile(reader.spec(), entry.file());
+        writer.add(entry);
+      }
+
+      threw = false;
+
+    } finally {
+      try {
+        writer.close();
+      } catch (IOException e) {
+        if (!threw) {
+          throw new RuntimeIOException(e, "Failed to close manifest: %s", outputFile);
+        }
+      }
+    }
+
+    return writer.toManifestFile();
+  }
+
+  /**
+   * Create a new {@link ManifestWriter}.
+   * <p>
+   * Manifests created by this writer are not part of a snapshot and have all entry snapshot IDs
+   * set to -1.
+   *
+   * @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples
+   * @param outputFile the destination file location
+   * @return a manifest writer
+   */
+  public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
+    return new ManifestWriter(spec, outputFile, -1);
+  }
+
   private final OutputFile file;
   private final int specId;
   private final FileAppender<ManifestEntry> writer;
@@ -55,33 +96,7 @@ class ManifestWriter implements FileAppender<DataFile> {
     this.stats = new PartitionSummary(spec);
   }
 
-  public void addExisting(Iterable<ManifestEntry> entries) {
-    for (ManifestEntry entry : entries) {
-      if (entry.status() != ManifestEntry.Status.DELETED) {
-        addExisting(entry);
-      }
-    }
-  }
-
-  public void addExisting(ManifestEntry entry) {
-    add(reused.wrapExisting(entry.snapshotId(), entry.file()));
-  }
-
-  public void addExisting(long newSnapshotId, DataFile newFile) {
-    add(reused.wrapExisting(newSnapshotId, newFile));
-  }
-
-  public void delete(ManifestEntry entry) {
-    // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
-    // when this Snapshot has been removed or when there are no Snapshots older than this one.
-    add(reused.wrapDelete(snapshotId, entry.file()));
-  }
-
-  public void delete(DataFile deletedFile) {
-    add(reused.wrapDelete(snapshotId, deletedFile));
-  }
-
-  public void add(ManifestEntry entry) {
+  void addEntry(ManifestEntry entry) {
     switch (entry.status()) {
       case ADDED:
         addedFiles += 1;
@@ -97,11 +112,53 @@ class ManifestWriter implements FileAppender<DataFile> {
     writer.add(entry);
   }
 
+  /**
+   * Add an added entry for a data file.
+   * <p>
+   * The entry's snapshot ID will be this manifest's snapshot ID.
+   *
+   * @param addedFile a data file
+   */
   @Override
   public void add(DataFile addedFile) {
     // TODO: this assumes that file is a GenericDataFile that can be written directly to Avro
     // Eventually, this should check in case there are other DataFile implementations.
-    add(reused.wrapAppend(snapshotId, addedFile));
+    addEntry(reused.wrapAppend(snapshotId, addedFile));
+  }
+
+  public void add(ManifestEntry entry) {
+    addEntry(reused.wrapAppend(snapshotId, entry.file()));
+  }
+
+  /**
+   * Add an existing entry for a data file.
+   *
+   * @param existingFile a data file
+   * @param fileSnapshotId snapshot ID when the data file was added to the table
+   */
+  public void existing(DataFile existingFile, long fileSnapshotId) {
+    addEntry(reused.wrapExisting(fileSnapshotId, existingFile));
+  }
+
+  void existing(ManifestEntry entry) {
+    addEntry(reused.wrapExisting(entry.snapshotId(), entry.file()));
+  }
+
+  /**
+   * Add a delete entry for a data file.
+   * <p>
+   * The entry's snapshot ID will be this manifest's snapshot ID.
+   *
+   * @param deletedFile a data file
+   */
+  public void delete(DataFile deletedFile) {
+    addEntry(reused.wrapDelete(snapshotId, deletedFile));
+  }
+
+  void delete(ManifestEntry entry) {
+    // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
+    // when this Snapshot has been removed or when there are no Snapshots older than this one.
+    addEntry(reused.wrapDelete(snapshotId, entry.file()));
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java b/core/src/main/java/org/apache/iceberg/MergeAppend.java
index 2646515..e84f45d 100644
--- a/core/src/main/java/org/apache/iceberg/MergeAppend.java
+++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java
@@ -46,4 +46,10 @@ class MergeAppend extends MergingSnapshotProducer<AppendFiles> implements Append
     add(file);
     return this;
   }
+
+  @Override
+  public AppendFiles appendManifest(ManifestFile manifest) {
+    add(manifest);
+    return this;
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index c9a72f3..548aced 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.lang.reflect.Array;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -82,6 +83,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
   // update data
   private final AtomicInteger manifestCount = new AtomicInteger(0);
   private final List<DataFile> newFiles = Lists.newArrayList();
+  private final List<ManifestFile> appendManifests = Lists.newArrayList();
   private final Set<CharSequenceWrapper> deletePaths = Sets.newHashSet();
   private final Set<StructLikeWrapper> dropPartitions = Sets.newHashSet();
   private Expression deleteExpression = Expressions.alwaysFalse();
@@ -179,6 +181,20 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
     newFiles.add(file);
   }
 
+  /**
+   * Add all files in a manifest to the new snapshot.
+   */
+  protected void add(ManifestFile manifest) {
+    // the manifest must be rewritten with this update's snapshot ID
+    try (ManifestReader reader = ManifestReader.read(
+        ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
+      appendManifests.add(ManifestWriter.copyAppendManifest(
+          reader, manifestPath(manifestCount.getAndIncrement()), snapshotId(), summaryBuilder));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
+    }
+  }
+
   @Override
   protected Map<String, String> summary() {
     return summaryBuilder.build();
@@ -216,13 +232,17 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
 
       Set<CharSequenceWrapper> deletedFiles = Sets.newHashSet();
 
-      // group manifests by compatible partition specs to be merged
+      // filter any existing manifests
+      List<ManifestFile> filtered;
       if (current != null) {
         List<ManifestFile> manifests = current.manifests();
-        ManifestFile[] filtered = filterManifests(metricsEvaluator, manifests);
-        groupManifestsByPartitionSpec(groups, deletedFiles, filtered);
+        filtered = Arrays.asList(filterManifests(metricsEvaluator, manifests));
+      } else {
+        filtered = ImmutableList.of();
       }
 
+      groupManifestsByPartitionSpec(groups, deletedFiles, Iterables.concat(appendManifests, filtered));
+
       List<ManifestFile> manifests = Lists.newArrayList();
       for (Map.Entry<Integer, List<ManifestFile>> entry : groups.entrySet()) {
         int groupId = entry.getKey();
@@ -261,7 +281,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
 
   private void groupManifestsByPartitionSpec(
       Map<Integer, List<ManifestFile>> groups,
-      Set<CharSequenceWrapper> deletedFiles, ManifestFile[] filtered) {
+      Set<CharSequenceWrapper> deletedFiles, Iterable<ManifestFile> filtered) {
     for (ManifestFile manifest : filtered) {
       PartitionSpec manifestSpec = ops.current().spec(manifest.partitionSpecId());
       Iterable<DataFile> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
@@ -320,14 +340,24 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
     }
   }
 
-  @Override
-  protected void cleanUncommitted(Set<ManifestFile> committed) {
+  private void cleanUncommittedAppends(Set<ManifestFile> committed) {
     if (cachedNewManifest != null && !committed.contains(cachedNewManifest)) {
       deleteFile(cachedNewManifest.path());
       this.cachedNewManifest = null;
     }
+
+    for (ManifestFile manifest : appendManifests) {
+      if (!committed.contains(manifest)) {
+        deleteFile(manifest.path());
+      }
+    }
+  }
+
+  @Override
+  protected void cleanUncommitted(Set<ManifestFile> committed) {
     cleanUncommittedMerges(committed);
     cleanUncommittedFilters(committed);
+    cleanUncommittedAppends(committed);
   }
 
   private boolean nothingToFilter() {
@@ -439,7 +469,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
             deletedPaths.add(wrapper);
 
           } else {
-            writer.addExisting(entry);
+            writer.existing(entry);
           }
         }
       });
@@ -534,14 +564,14 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
               // suppress deletes from previous snapshots. only files deleted by this snapshot
               // should be added to the new manifest
               if (entry.snapshotId() == snapshotId()) {
-                writer.add(entry);
+                writer.addEntry(entry);
               }
             } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
               // adds from this snapshot are still adds, otherwise they should be existing
-              writer.add(entry);
+              writer.addEntry(entry);
             } else {
               // add all files from the old manifest as existing files
-              writer.addExisting(entry);
+              writer.existing(entry);
             }
           }
         }
diff --git a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
index 0077070..73a0505 100644
--- a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
+++ b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
@@ -236,7 +236,7 @@ public class ReplaceManifests extends SnapshotProducer<RewriteManifests> impleme
         writer = newWriter();
       }
 
-      writer.addExisting(entry);
+      writer.existing(entry);
       estimatedSize += len;
     }
 
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 65ce946..6c4f682 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import java.io.File;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.types.Types;
 import org.junit.After;
 import org.junit.Assert;
@@ -119,6 +121,23 @@ public class TableTestBase {
     return TestTables.readMetadata("test");
   }
 
+  ManifestFile writeManifest(DataFile... files) throws IOException {
+    File manifestFile = temp.newFile("input.m0.avro");
+    Assert.assertTrue(manifestFile.delete());
+    OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
+
+    ManifestWriter writer = ManifestWriter.write(table.spec(), outputFile);
+    try {
+      for (DataFile file : files) {
+        writer.add(file);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return writer.toManifestFile();
+  }
+
   void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) {
     List<ManifestFile> oldManifests = old != null ? old.manifests() : ImmutableList.of();
 
diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
index f036a85..e336f5f 100644
--- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import org.apache.iceberg.exceptions.CommitFailedException;
@@ -46,6 +47,45 @@ public class TestFastAppend extends TableTestBase {
   }
 
   @Test
+  public void testEmptyTableAppendManifest() throws IOException {
+    Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+    TableMetadata base = readMetadata();
+    Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+    ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+    Snapshot pending = table.newFastAppend()
+        .appendManifest(manifest)
+        .apply();
+
+    validateSnapshot(base.currentSnapshot(), pending, FILE_A, FILE_B);
+  }
+
+  @Test
+  public void testEmptyTableAppendFilesAndManifest() throws IOException {
+    Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+    TableMetadata base = readMetadata();
+    Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+    ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+    Snapshot pending = table.newFastAppend()
+        .appendFile(FILE_C)
+        .appendFile(FILE_D)
+        .appendManifest(manifest)
+        .apply();
+
+    long pendingId = pending.snapshotId();
+
+    validateManifest(pending.manifests().get(0),
+        ids(pendingId, pendingId),
+        files(FILE_C, FILE_D));
+    validateManifest(pending.manifests().get(1),
+        ids(pendingId, pendingId),
+        files(FILE_A, FILE_B));
+  }
+
+  @Test
   public void testNonEmptyTableAppend() {
     table.newAppend()
         .appendFile(FILE_A)
@@ -171,6 +211,24 @@ public class TestFastAppend extends TableTestBase {
   }
 
   @Test
+  public void testAppendManifestCleanup() throws IOException {
+    // inject 5 failures
+    TestTables.TestTableOperations ops = table.ops();
+    ops.failCommits(5);
+
+    ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+    AppendFiles append = table.newFastAppend().appendManifest(manifest);
+    Snapshot pending = append.apply();
+    ManifestFile newManifest = pending.manifests().get(0);
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
+
+    AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
+        CommitFailedException.class, "Injected failure", append::commit);
+
+    Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
+  }
+
+  @Test
   public void testRecoveryWithManifestList() {
     table.updateProperties().set(TableProperties.MANIFEST_LISTS_ENABLED, "true").commit();
 
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 33b2b09..64d8948 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.File;
+import java.io.IOException;
 import java.util.Set;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.CommitFailedException;
@@ -52,6 +53,70 @@ public class TestMergeAppend extends TableTestBase {
   }
 
   @Test
+  public void testEmptyTableAppendManifest() throws IOException {
+    Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+    TableMetadata base = readMetadata();
+    Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+    ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+    Snapshot pending = table.newAppend()
+        .appendManifest(manifest)
+        .apply();
+
+    validateSnapshot(base.currentSnapshot(), pending, FILE_A, FILE_B);
+  }
+
+  @Test
+  public void testEmptyTableAppendFilesAndManifest() throws IOException {
+    Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+    TableMetadata base = readMetadata();
+    Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+    ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+    Snapshot pending = table.newAppend()
+        .appendFile(FILE_C)
+        .appendFile(FILE_D)
+        .appendManifest(manifest)
+        .apply();
+
+    long pendingId = pending.snapshotId();
+
+    validateManifest(pending.manifests().get(0),
+        ids(pendingId, pendingId),
+        files(FILE_C, FILE_D));
+    validateManifest(pending.manifests().get(1),
+        ids(pendingId, pendingId),
+        files(FILE_A, FILE_B));
+  }
+
+  @Test
+  public void testMergeWithAppendFilesAndManifest() throws IOException {
+    // merge all manifests for this test
+    table.updateProperties().set("commit.manifest.min-count-to-merge", "1").commit();
+
+    Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+
+    TableMetadata base = readMetadata();
+    Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+    ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+    Snapshot pending = table.newAppend()
+        .appendFile(FILE_C)
+        .appendFile(FILE_D)
+        .appendManifest(manifest)
+        .apply();
+
+    long pendingId = pending.snapshotId();
+
+    Assert.assertEquals("Should create 1 merged manifest", 1, pending.manifests().size());
+    validateManifest(pending.manifests().get(0),
+        ids(pendingId, pendingId, pendingId, pendingId),
+        files(FILE_C, FILE_D, FILE_A, FILE_B));
+  }
+
+  @Test
   public void testMergeWithExistingManifest() {
     // merge all manifests for this test
     table.updateProperties().set("commit.manifest.min-count-to-merge", "1").commit();
@@ -341,6 +406,24 @@ public class TestMergeAppend extends TableTestBase {
   }
 
   @Test
+  public void testAppendManifestCleanup() throws IOException {
+    // inject 5 failures
+    TestTables.TestTableOperations ops = table.ops();
+    ops.failCommits(5);
+
+    ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+    AppendFiles append = table.newAppend().appendManifest(manifest);
+    Snapshot pending = append.apply();
+    ManifestFile newManifest = pending.manifests().get(0);
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
+
+    AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
+        CommitFailedException.class, "Injected failure", append::commit);
+
+    Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
+  }
+
+  @Test
   public void testRecovery() {
     // merge all manifests for this test
     table.updateProperties().set("commit.manifest.min-count-to-merge", "1").commit();