You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/12 18:01:52 UTC

[incubator-iceberg] branch master updated: Extend RewriteManifests to delete/add manifests directly (#512)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 70c1940  Extend RewriteManifests to delete/add manifests directly (#512)
70c1940 is described below

commit 70c1940209b9c444363322bf547b65d9320e0cea
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Nov 12 20:01:44 2019 +0200

    Extend RewriteManifests to delete/add manifests directly (#512)
---
 .../java/org/apache/iceberg/RewriteManifests.java  |  24 +-
 .../org/apache/iceberg/BaseRewriteManifests.java   | 152 ++++++--
 .../java/org/apache/iceberg/ManifestWriter.java    |  29 +-
 .../java/org/apache/iceberg/TableTestBase.java     |  31 ++
 .../org/apache/iceberg/TestRewriteManifests.java   | 414 +++++++++++++++++++++
 5 files changed, 612 insertions(+), 38 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/RewriteManifests.java b/api/src/main/java/org/apache/iceberg/RewriteManifests.java
index 1ed1ddf..74032eb 100644
--- a/api/src/main/java/org/apache/iceberg/RewriteManifests.java
+++ b/api/src/main/java/org/apache/iceberg/RewriteManifests.java
@@ -29,6 +29,10 @@ import java.util.function.Predicate;
  * described only by the manifest files that were added, and commits that snapshot as the
  * current.
  * <p>
+ * This API can be used to rewrite matching manifests according to a clustering function as well as
+ * to replace specific manifests. Manifests that are deleted or added directly are ignored during
+ * the rewrite process. The set of active files in replaced manifests must be the same as in new manifests.
+ * <p>
  * When committing, these changes will be applied to the latest table snapshot. Commit conflicts
  * will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
  */
@@ -37,7 +41,8 @@ public interface RewriteManifests extends SnapshotUpdate<RewriteManifests> {
    * Groups an existing {@link DataFile} by a cluster key produced by a function. The cluster key
    * will determine which data file will be associated with a particular manifest. All data files
    * with the same cluster key will be written to the same manifest (unless the file is large and
-   * split into multiple files).
+   * split into multiple files). Manifests deleted via {@link #deleteManifest(ManifestFile)} or
+   * added via {@link #addManifest(ManifestFile)} are ignored during the rewrite process.
    *
    * @param func Function used to cluster data files to manifests.
    * @return this for method chaining
@@ -54,4 +59,21 @@ public interface RewriteManifests extends SnapshotUpdate<RewriteManifests> {
    * @return this for method chaining
    */
   RewriteManifests rewriteIf(Predicate<ManifestFile> predicate);
+
+  /**
+   * Deletes a {@link ManifestFile manifest file} from the table.
+   *
+   * @param manifest a manifest to delete
+   * @return this for method chaining
+   */
+  RewriteManifests deleteManifest(ManifestFile manifest);
+
+  /**
+   * Adds a {@link ManifestFile manifest file} to the table. The added manifest cannot contain new
+   * or deleted files.
+   *
+   * @param manifest a manifest to add
+   * @return this for method chaining
+   */
+  RewriteManifests addManifest(ManifestFile manifest);
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index c90d8d6..4a084b9 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -20,6 +20,8 @@
 package org.apache.iceberg;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -34,7 +36,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
 
@@ -43,27 +48,33 @@ import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFA
 
 
 public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> implements RewriteManifests {
+  private static final String KEPT_MANIFESTS_COUNT = "manifests-kept";
+  private static final String CREATED_MANIFESTS_COUNT = "manifests-created";
+  private static final String REPLACED_MANIFESTS_COUNT = "manifests-replaced";
+  private static final String PROCESSED_ENTRY_COUNT = "entries-processed";
+
+  private static final Set<ManifestEntry.Status> ALLOWED_ENTRY_STATUSES = ImmutableSet.of(
+      ManifestEntry.Status.EXISTING);
+
   private final TableOperations ops;
   private final PartitionSpec spec;
   private final long manifestTargetSizeBytes;
 
+  private final Set<ManifestFile> deletedManifests = Sets.newHashSet();
+  private final List<ManifestFile> addedManifests = Lists.newArrayList();
+
   private final List<ManifestFile> keptManifests = Collections.synchronizedList(new ArrayList<>());
   private final List<ManifestFile> newManifests = Collections.synchronizedList(new ArrayList<>());
-  private final Set<ManifestFile> replacedManifests = Collections.synchronizedSet(new HashSet<>());
+  private final Set<ManifestFile> rewrittenManifests = Collections.synchronizedSet(new HashSet<>());
   private final Map<Object, WriterWrapper> writers = Collections.synchronizedMap(new HashMap<>());
 
   private final AtomicInteger manifestSuffix = new AtomicInteger(0);
   private final AtomicLong entryCount = new AtomicLong(0);
 
-  private final Map<String, String> summaryProps = new HashMap<>();
-
   private Function<DataFile, Object> clusterByFunc;
   private Predicate<ManifestFile> predicate;
 
-  private static final String REPLACED_CNT = "manifests-replaced";
-  private static final String KEPT_CNT = "manifests-kept";
-  private static final String NEW_CNT = "manifests-created";
-  private static final String ENTRY_CNT = "entries-processed";
+  private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
 
   BaseRewriteManifests(TableOperations ops) {
     super(ops);
@@ -85,19 +96,17 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
 
   @Override
   public RewriteManifests set(String property, String value) {
-    summaryProps.put(property, value);
+    summaryBuilder.set(property, value);
     return this;
   }
 
   @Override
   protected Map<String, String> summary() {
-    Map<String, String> result = new HashMap<>();
-    result.putAll(summaryProps);
-    result.put(KEPT_CNT, Integer.toString(keptManifests.size()));
-    result.put(NEW_CNT, Integer.toString(newManifests.size()));
-    result.put(REPLACED_CNT, Integer.toString(replacedManifests.size()));
-    result.put(ENTRY_CNT, Long.toString(entryCount.get()));
-    return result;
+    summaryBuilder.set(KEPT_MANIFESTS_COUNT, String.valueOf(keptManifests.size()));
+    summaryBuilder.set(CREATED_MANIFESTS_COUNT, String.valueOf(newManifests.size() + addedManifests.size()));
+    summaryBuilder.set(REPLACED_MANIFESTS_COUNT, String.valueOf(rewrittenManifests.size() + deletedManifests.size()));
+    summaryBuilder.set(PROCESSED_ENTRY_COUNT, String.valueOf(entryCount.get()));
+    return summaryBuilder.build();
   }
 
   @Override
@@ -113,50 +122,84 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
   }
 
   @Override
-  public List<ManifestFile> apply(TableMetadata base) {
-    Preconditions.checkNotNull(clusterByFunc, "clusterBy function cannot be null");
+  public RewriteManifests deleteManifest(ManifestFile manifest) {
+    deletedManifests.add(manifest);
+    return this;
+  }
 
+  @Override
+  public RewriteManifests addManifest(ManifestFile manifest) {
+    try {
+      // the appended manifest must be rewritten with this update's snapshot ID
+      addedManifests.add(copyManifest(manifest));
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Cannot append manifest: " + e.getMessage());
+    }
+    return this;
+  }
+
+  private ManifestFile copyManifest(ManifestFile manifest) {
+    Map<Integer, PartitionSpec> specsById = ops.current().specsById();
+    try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()), specsById)) {
+      OutputFile newFile = manifestPath(manifestSuffix.getAndIncrement());
+      return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
+    }
+  }
+
+  @Override
+  public List<ManifestFile> apply(TableMetadata base) {
     List<ManifestFile> currentManifests = base.currentSnapshot().manifests();
+    Set<ManifestFile> currentManifestSet = ImmutableSet.copyOf(currentManifests);
+
+    validateDeletedManifests(currentManifestSet);
 
-    if (requiresRewrite(currentManifests)) {
-      // run the rewrite process
+    if (requiresRewrite(currentManifestSet)) {
       performRewrite(currentManifests);
     } else {
-      // just keep any new manifests that were added since the last apply(), don't rerun
-      addExistingFromNewCommit(currentManifests);
+      keepActiveManifests(currentManifests);
     }
 
+    validateFilesCounts();
+
     // put new manifests at the beginning
     List<ManifestFile> apply = new ArrayList<>();
     apply.addAll(newManifests);
+    apply.addAll(addedManifests);
     apply.addAll(keptManifests);
 
     return apply;
   }
 
-  private boolean requiresRewrite(List<ManifestFile> currentManifests) {
-    if (replacedManifests.size() == 0) {
+  private boolean requiresRewrite(Set<ManifestFile> currentManifests) {
+    if (clusterByFunc == null) {
+      // manifests are deleted and added directly so don't perform a rewrite
+      return false;
+    }
+
+    if (rewrittenManifests.size() == 0) {
       // nothing yet processed so perform a full rewrite
       return true;
     }
+
     // if any processed manifest is not in the current manifest list, perform a full rewrite
-    Set<ManifestFile> set = Sets.newHashSet(currentManifests);
-    return replacedManifests.stream().anyMatch(manifest -> !set.contains(manifest));
+    return rewrittenManifests.stream().anyMatch(manifest -> !currentManifests.contains(manifest));
   }
 
-  private void addExistingFromNewCommit(List<ManifestFile> currentManifests) {
+  private void keepActiveManifests(List<ManifestFile> currentManifests) {
     // keep any existing manifests as-is that were not processed
     keptManifests.clear();
     currentManifests.stream()
-      .filter(manifest -> !replacedManifests.contains(manifest))
+      .filter(manifest -> !rewrittenManifests.contains(manifest) && !deletedManifests.contains(manifest))
       .forEach(manifest -> keptManifests.add(manifest));
   }
 
   private void reset() {
-    cleanAll();
+    cleanUncommitted(newManifests, ImmutableSet.of());
     entryCount.set(0);
     keptManifests.clear();
-    replacedManifests.clear();
+    rewrittenManifests.clear();
     newManifests.clear();
     writers.clear();
   }
@@ -164,14 +207,18 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
   private void performRewrite(List<ManifestFile> currentManifests) {
     reset();
 
+    List<ManifestFile> remainingManifests = currentManifests.stream()
+        .filter(manifest -> !deletedManifests.contains(manifest))
+        .collect(Collectors.toList());
+
     try {
-      Tasks.foreach(currentManifests)
+      Tasks.foreach(remainingManifests)
           .executeWith(ThreadPools.getWorkerPool())
           .run(manifest -> {
             if (predicate != null && !predicate.test(manifest)) {
               keptManifests.add(manifest);
             } else {
-              replacedManifests.add(manifest);
+              rewrittenManifests.add(manifest);
               try (ManifestReader reader =
                      ManifestReader.read(ops.io().newInputFile(manifest.path()), ops.current().specsById())) {
                 FilteredManifest filteredManifest = reader.select(Arrays.asList("*"));
@@ -189,6 +236,40 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
     }
   }
 
+  private void validateDeletedManifests(Set<ManifestFile> currentManifests) {
+    // directly deleted manifests must be still present in the current snapshot
+    deletedManifests.stream()
+        .filter(manifest -> !currentManifests.contains(manifest))
+        .findAny()
+        .ifPresent(manifest -> {
+          throw new ValidationException("Manifest is missing: %s", manifest.path());
+        });
+  }
+
+  private void validateFilesCounts() {
+    int createdManifestsFilesCount = activeFilesCount(newManifests) + activeFilesCount(addedManifests);
+    int replacedManifestsFilesCount = activeFilesCount(rewrittenManifests) + activeFilesCount(deletedManifests);
+
+    if (createdManifestsFilesCount != replacedManifestsFilesCount) {
+      throw new ValidationException(
+          "Replaced and created manifests must have the same number of active files: %d (new), %d (old)",
+          createdManifestsFilesCount, replacedManifestsFilesCount);
+    }
+  }
+
+  private int activeFilesCount(Iterable<ManifestFile> manifests) {
+    int activeFilesCount = 0;
+
+    for (ManifestFile manifest : manifests) {
+      Preconditions.checkNotNull(manifest.addedFilesCount(), "Missing file counts in %s", manifest.path());
+      Preconditions.checkNotNull(manifest.existingFilesCount(), "Missing file counts in %s", manifest.path());
+      activeFilesCount += manifest.addedFilesCount();
+      activeFilesCount += manifest.existingFilesCount();
+    }
+
+    return activeFilesCount;
+  }
+
   private void appendEntry(ManifestEntry entry, Object key) {
     Preconditions.checkNotNull(entry, "Manifest entry cannot be null");
     Preconditions.checkNotNull(key, "Key cannot be null");
@@ -214,8 +295,13 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
 
   @Override
   protected void cleanUncommitted(Set<ManifestFile> committed) {
-    for (ManifestFile manifest : newManifests) {
-      if (!committed.contains(manifest)) {
+    cleanUncommitted(newManifests, committed);
+    cleanUncommitted(addedManifests, committed);
+  }
+
+  private void cleanUncommitted(Iterable<ManifestFile> manifests, Set<ManifestFile> committedManifests) {
+    for (ManifestFile manifest : manifests) {
+      if (!committedManifests.contains(manifest)) {
         deleteFile(manifest.path());
       }
     }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index eb53829..e7441e6 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -20,7 +20,9 @@
 package org.apache.iceberg;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.util.Set;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileAppender;
@@ -36,14 +38,33 @@ public class ManifestWriter implements FileAppender<DataFile> {
 
   static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
                                          SnapshotSummary.Builder summaryBuilder) {
+    return copyManifest(reader, outputFile, snapshotId, summaryBuilder, Sets.newHashSet(ManifestEntry.Status.ADDED));
+  }
+
+  static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
+                                   SnapshotSummary.Builder summaryBuilder,
+                                   Set<ManifestEntry.Status> allowedEntryStatuses) {
     ManifestWriter writer = new ManifestWriter(reader.spec(), outputFile, snapshotId);
     boolean threw = true;
     try {
       for (ManifestEntry entry : reader.entries()) {
-        Preconditions.checkArgument(entry.status() == ManifestEntry.Status.ADDED,
-            "Cannot append manifest: contains existing files");
-        summaryBuilder.addedFile(reader.spec(), entry.file());
-        writer.add(entry);
+        Preconditions.checkArgument(
+            allowedEntryStatuses.contains(entry.status()),
+            "Invalid manifest entry status: %s (allowed statuses: %s)",
+            entry.status(), allowedEntryStatuses);
+        switch (entry.status()) {
+          case ADDED:
+            summaryBuilder.addedFile(reader.spec(), entry.file());
+            writer.add(entry);
+            break;
+          case EXISTING:
+            writer.existing(entry);
+            break;
+          case DELETED:
+            summaryBuilder.deletedFile(reader.spec(), entry.file());
+            writer.delete(entry);
+            break;
+        }
       }
 
       threw = false;
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 7857a99..9f98c21 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -140,6 +140,37 @@ public class TableTestBase {
     return writer.toManifestFile();
   }
 
+  ManifestFile writeManifest(String fileName, ManifestEntry... entries) throws IOException {
+    File manifestFile = temp.newFile(fileName);
+    Assert.assertTrue(manifestFile.delete());
+    OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
+
+    ManifestWriter writer = ManifestWriter.write(table.spec(), outputFile);
+    try {
+      for (ManifestEntry entry : entries) {
+        writer.addEntry(entry);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return writer.toManifestFile();
+  }
+
+  ManifestEntry manifestEntry(ManifestEntry.Status status, long snapshotId, DataFile file) {
+    ManifestEntry entry = new ManifestEntry(table.spec().partitionType());
+    switch (status) {
+      case ADDED:
+        return entry.wrapAppend(snapshotId, file);
+      case EXISTING:
+        return entry.wrapExisting(snapshotId, file);
+      case DELETED:
+        return entry.wrapDelete(snapshotId, file);
+      default:
+        throw new IllegalArgumentException("Unexpected entry status: " + status);
+    }
+  }
+
   void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) {
     List<ManifestFile> oldManifests = old != null ? old.manifests() : ImmutableList.of();
 
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index 669722a..340bc09 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -19,15 +19,19 @@
 
 package org.apache.iceberg;
 
+import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -329,4 +333,414 @@ public class TestRewriteManifests extends TableTestBase {
                             files(FILE_A),
                             statuses(ManifestEntry.Status.EXISTING));
   }
+
+  @Test
+  public void testBasicManifestReplacement() throws IOException {
+    Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+    List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+    Assert.assertEquals(1, firstSnapshotManifests.size());
+    ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+    table.newFastAppend()
+        .appendFile(FILE_C)
+        .appendFile(FILE_D)
+        .commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    ManifestFile firstNewManifest = writeManifest(
+        "manifest-file-1.avro",
+        manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+    ManifestFile secondNewManifest = writeManifest(
+        "manifest-file-2.avro",
+        manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    rewriteManifests.deleteManifest(firstSnapshotManifest);
+    rewriteManifests.addManifest(firstNewManifest);
+    rewriteManifests.addManifest(secondNewManifest);
+    rewriteManifests.commit();
+
+    Snapshot snapshot = table.currentSnapshot();
+    List<ManifestFile> manifests = snapshot.manifests();
+    Assert.assertEquals(3, manifests.size());
+
+    validateSummary(snapshot, 1, 1, 2, 0);
+
+    validateManifestEntries(
+        manifests.get(0),
+        ids(firstSnapshot.snapshotId()),
+        files(FILE_A),
+        statuses(ManifestEntry.Status.EXISTING));
+
+    validateManifestEntries(
+        manifests.get(1),
+        ids(firstSnapshot.snapshotId()),
+        files(FILE_B),
+        statuses(ManifestEntry.Status.EXISTING));
+
+    validateManifestEntries(
+        manifests.get(2),
+        ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()),
+        files(FILE_C, FILE_D),
+        statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+  }
+
+  @Test
+  public void testManifestReplacementConcurrentAppend() throws IOException {
+    Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+    List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+    Assert.assertEquals(1, firstSnapshotManifests.size());
+    ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+    ManifestFile firstNewManifest = writeManifest(
+        "manifest-file-1.avro",
+        manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+    ManifestFile secondNewManifest = writeManifest(
+        "manifest-file-2.avro",
+        manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    rewriteManifests.deleteManifest(firstSnapshotManifest);
+    rewriteManifests.addManifest(firstNewManifest);
+    rewriteManifests.addManifest(secondNewManifest);
+
+    table.newFastAppend()
+        .appendFile(FILE_C)
+        .appendFile(FILE_D)
+        .commit();
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    Assert.assertEquals(2, table.currentSnapshot().manifests().size());
+
+    rewriteManifests.commit();
+
+    Snapshot snapshot = table.currentSnapshot();
+    List<ManifestFile> manifests = snapshot.manifests();
+    Assert.assertEquals(3, manifests.size());
+
+    validateSummary(snapshot, 1, 1, 2, 0);
+
+    validateManifestEntries(
+        manifests.get(0),
+        ids(firstSnapshot.snapshotId()),
+        files(FILE_A),
+        statuses(ManifestEntry.Status.EXISTING));
+
+    validateManifestEntries(
+        manifests.get(1),
+        ids(firstSnapshot.snapshotId()),
+        files(FILE_B),
+        statuses(ManifestEntry.Status.EXISTING));
+
+    validateManifestEntries(
+        manifests.get(2),
+        ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()),
+        files(FILE_C, FILE_D),
+        statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+  }
+
+  @Test
+  public void testManifestReplacementConcurrentDelete() throws IOException {
+    Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+    table.updateProperties()
+        .set(MANIFEST_MERGE_ENABLED, "false")
+        .commit();
+
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+    List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+    Assert.assertEquals(1, firstSnapshotManifests.size());
+    ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+    table.newFastAppend()
+        .appendFile(FILE_C)
+        .appendFile(FILE_D)
+        .commit();
+    long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+    ManifestFile firstNewManifest = writeManifest(
+        "manifest-file-1.avro",
+        manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+    ManifestFile secondNewManifest = writeManifest(
+        "manifest-file-2.avro",
+        manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    rewriteManifests.deleteManifest(firstSnapshotManifest);
+    rewriteManifests.addManifest(firstNewManifest);
+    rewriteManifests.addManifest(secondNewManifest);
+
+    table.newDelete()
+        .deleteFile(FILE_C)
+        .commit();
+    long thirdSnapshotId = table.currentSnapshot().snapshotId();
+
+    rewriteManifests.commit();
+
+    Snapshot snapshot = table.currentSnapshot();
+    List<ManifestFile> manifests = snapshot.manifests();
+    Assert.assertEquals(3, manifests.size());
+
+    validateSummary(snapshot, 1, 1, 2, 0);
+
+    validateManifestEntries(
+        manifests.get(0),
+        ids(firstSnapshot.snapshotId()),
+        files(FILE_A),
+        statuses(ManifestEntry.Status.EXISTING));
+
+    validateManifestEntries(
+        manifests.get(1),
+        ids(firstSnapshot.snapshotId()),
+        files(FILE_B),
+        statuses(ManifestEntry.Status.EXISTING));
+
+    validateManifestEntries(
+        manifests.get(2),
+        ids(thirdSnapshotId, secondSnapshotId),
+        files(FILE_C, FILE_D),
+        statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
+  }
+
+  @Test
+  public void testManifestReplacementConcurrentConflictingDelete() throws IOException {
+    Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+    List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+    Assert.assertEquals(1, firstSnapshotManifests.size());
+    ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+    ManifestFile firstNewManifest = writeManifest(
+        "manifest-file-1.avro",
+        manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+    ManifestFile secondNewManifest = writeManifest(
+        "manifest-file-2.avro",
+        manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    rewriteManifests.deleteManifest(firstSnapshotManifest);
+    rewriteManifests.addManifest(firstNewManifest);
+    rewriteManifests.addManifest(secondNewManifest);
+
+    table.newDelete()
+        .deleteFile(FILE_A)
+        .commit();
+
+    AssertHelpers.assertThrows("Should reject commit",
+        ValidationException.class, "Manifest is missing",
+        rewriteManifests::commit);
+  }
+
+  @Test
+  public void testManifestReplacementCombinedWithRewrite() throws IOException {
+    Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+    List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+    Assert.assertEquals(1, firstSnapshotManifests.size());
+    ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+    table.newFastAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    table.newFastAppend()
+        .appendFile(FILE_C)
+        .commit();
+
+    table.newFastAppend()
+        .appendFile(FILE_D)
+        .commit();
+
+    Assert.assertEquals(4, Iterables.size(table.snapshots()));
+
+    ManifestFile newManifest = writeManifest(
+        "manifest-file-1.avro",
+        manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+
+    table.rewriteManifests()
+        .deleteManifest(firstSnapshotManifest)
+        .addManifest(newManifest)
+        .clusterBy(dataFile -> "const-value")
+        .rewriteIf(manifest -> {
+          try (ManifestReader reader = ManifestReader.read(localInput(manifest.path()))) {
+            return !reader.iterator().next().path().equals(FILE_B.path());
+          } catch (IOException x) {
+            throw new RuntimeIOException(x);
+          }
+        })
+        .commit();
+
+    Snapshot snapshot = table.currentSnapshot();
+    List<ManifestFile> manifests = snapshot.manifests();
+    Assert.assertEquals(3, manifests.size());
+
+    validateSummary(snapshot, 3, 1, 2, 2);
+
+    validateManifestEntries(
+        manifests.get(1),
+        ids(firstSnapshot.snapshotId()),
+        files(FILE_A),
+        statuses(ManifestEntry.Status.EXISTING));
+
+    validateManifestEntries(
+        manifests.get(2),
+        ids(secondSnapshot.snapshotId()),
+        files(FILE_B),
+        statuses(ManifestEntry.Status.ADDED));
+  }
+
+  @Test
+  public void testManifestReplacementCombinedWithRewriteConcurrentDelete() throws IOException {
+    Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+    table.updateProperties()
+        .set(MANIFEST_MERGE_ENABLED, "false")
+        .commit();
+
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+    List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+    Assert.assertEquals(1, firstSnapshotManifests.size());
+    ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+    table.newFastAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    table.newFastAppend()
+        .appendFile(FILE_C)
+        .commit();
+
+    Assert.assertEquals(3, Iterables.size(table.snapshots()));
+
+    ManifestFile newManifest = writeManifest(
+        "manifest-file-1.avro",
+        manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests()
+        .deleteManifest(firstSnapshotManifest)
+        .addManifest(newManifest)
+        .clusterBy(dataFile -> "const-value");
+
+    rewriteManifests.apply();
+
+    table.newDelete()
+        .deleteFile(FILE_C)
+        .commit();
+
+    rewriteManifests.commit();
+
+    Snapshot snapshot = table.currentSnapshot();
+    List<ManifestFile> manifests = snapshot.manifests();
+    Assert.assertEquals(2, manifests.size());
+
+    validateSummary(snapshot, 3, 0, 2, 1);
+
+    validateManifestEntries(
+        manifests.get(0),
+        ids(secondSnapshot.snapshotId()),
+        files(FILE_B),
+        statuses(ManifestEntry.Status.EXISTING));
+
+    validateManifestEntries(
+        manifests.get(1),
+        ids(firstSnapshot.snapshotId()),
+        files(FILE_A),
+        statuses(ManifestEntry.Status.EXISTING));
+  }
+
+  @Test
+  public void testInvalidUsage() throws IOException {
+    Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    Snapshot snapshot = table.currentSnapshot();
+    List<ManifestFile> manifests = snapshot.manifests();
+    Assert.assertEquals(1, manifests.size());
+    ManifestFile manifest = manifests.get(0);
+
+    ManifestFile invalidAddedFileManifest = writeManifest(
+        "manifest-file-2.avro",
+        manifestEntry(ManifestEntry.Status.ADDED, snapshot.snapshotId(), FILE_A));
+
+    AssertHelpers.assertThrows("Should reject commit",
+        IllegalArgumentException.class, "Cannot append manifest: Invalid manifest",
+        () -> table.rewriteManifests()
+            .deleteManifest(manifest)
+            .addManifest(invalidAddedFileManifest)
+            .commit());
+
+    ManifestFile invalidDeletedFileManifest = writeManifest(
+        "manifest-file-3.avro",
+        manifestEntry(ManifestEntry.Status.DELETED, snapshot.snapshotId(), FILE_A));
+
+    AssertHelpers.assertThrows("Should reject commit",
+        IllegalArgumentException.class, "Cannot append manifest: Invalid manifest",
+        () -> table.rewriteManifests()
+            .deleteManifest(manifest)
+            .addManifest(invalidDeletedFileManifest)
+            .commit());
+
+    AssertHelpers.assertThrows("Should reject commit",
+        ValidationException.class, "must have the same number of active files",
+        () -> table.rewriteManifests()
+            .deleteManifest(manifest)
+            .commit());
+  }
+
+  private void validateSummary(Snapshot snapshot, int replaced, int kept, int created, int entryCount) {
+    Map<String, String> summary = snapshot.summary();
+    Assert.assertEquals(
+        "Replaced manifest count should match",
+        replaced, Integer.parseInt(summary.get("manifests-replaced")));
+    Assert.assertEquals(
+        "Kept manifest count should match",
+        kept, Integer.parseInt(summary.get("manifests-kept")));
+    Assert.assertEquals(
+        "Created manifest count should match",
+        created, Integer.parseInt(summary.get("manifests-created")));
+    Assert.assertEquals(
+        "Entry count should match",
+        entryCount, Integer.parseInt(summary.get("entries-processed")));
+  }
 }