You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/06/10 15:43:29 UTC

[iceberg] branch master updated: Refactor MergingSnapshotProducer (#1098)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bd606f5  Refactor MergingSnapshotProducer (#1098)
bd606f5 is described below

commit bd606f56b14c49f39aa745534718d0cb6a50a8c9
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Wed Jun 10 08:43:19 2020 -0700

    Refactor MergingSnapshotProducer (#1098)
---
 .../expressions/StrictMetricsEvaluator.java        |   7 +-
 .../org/apache/iceberg/util/CharSequenceSet.java   |   5 +
 .../org/apache/iceberg/BaseReplacePartitions.java  |   2 +-
 .../org/apache/iceberg/ManifestFilterManager.java  | 426 +++++++++++++++
 .../org/apache/iceberg/ManifestMergeManager.java   | 192 +++++++
 .../apache/iceberg/MergingSnapshotProducer.java    | 595 ++++-----------------
 .../java/org/apache/iceberg/SnapshotSummary.java   |  12 +
 .../java/org/apache/iceberg/util/Exceptions.java   |  15 +
 .../java/org/apache/iceberg/TableTestBase.java     |  35 +-
 .../iceberg/TestSequenceNumberForV2Table.java      |  12 +-
 10 files changed, 797 insertions(+), 504 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
index 4a5598e..638eb25 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
@@ -41,7 +42,7 @@ import static org.apache.iceberg.expressions.Expressions.rewriteNot;
  * example, if a file's ts column has min X and max Y, this evaluator will return true for ts &lt; Y+1
  * but not for ts &lt; Y-1.
  * <p>
- * Files are passed to {@link #eval(DataFile)}, which returns true if all rows in the file must
+ * Files are passed to {@link #eval(ContentFile)}, which returns true if all rows in the file must
  * contain matching rows and false if the file may contain rows that do not match.
  */
 public class StrictMetricsEvaluator {
@@ -69,7 +70,7 @@ public class StrictMetricsEvaluator {
    * @param file a data file
    * @return false if the file cannot contain rows that match the expression, true otherwise.
    */
-  public boolean eval(DataFile file) {
+  public boolean eval(ContentFile<?> file) {
     // TODO: detect the case where a column is missing from the file using file's max field id.
     return visitor().eval(file);
   }
@@ -83,7 +84,7 @@ public class StrictMetricsEvaluator {
     private Map<Integer, ByteBuffer> lowerBounds = null;
     private Map<Integer, ByteBuffer> upperBounds = null;
 
-    private boolean eval(DataFile file) {
+    private boolean eval(ContentFile<?> file) {
       if (file.recordCount() <= 0) {
         return ROWS_MUST_MATCH;
       }
diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java b/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java
index a724682..96d39f8 100644
--- a/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java
+++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -32,6 +33,10 @@ public class CharSequenceSet implements Set<CharSequence>, Serializable {
     return new CharSequenceSet(charSequences);
   }
 
+  public static Set<CharSequence> empty() {
+    return new CharSequenceSet(ImmutableList.of());
+  }
+
   private final Set<CharSequenceWrapper> wrapperSet;
   private final CharSequenceWrapper containsWrapper = CharSequenceWrapper.wrap(null);
 
diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
index 6b6fb55..f991dcf 100644
--- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
+++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
@@ -61,7 +61,7 @@ public class BaseReplacePartitions
 
     try {
       return super.apply(base);
-    } catch (DeleteException e) {
+    } catch (ManifestFilterManager.DeleteException e) {
       throw new ValidationException(
           "Cannot commit file that conflicts with existing partition: %s", e.partition());
     }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
new file mode 100644
index 0000000..7f0896a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.StrictMetricsEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.CharSequenceWrapper;
+import org.apache.iceberg.util.ManifestFileUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class ManifestFilterManager<F extends ContentFile<F>> {
+  private static final Logger LOG = LoggerFactory.getLogger(ManifestFilterManager.class);
+  private static final Joiner COMMA = Joiner.on(",");
+
+  protected static class DeleteException extends ValidationException {
+    private final String partition;
+
+    private DeleteException(String partition) {
+      super("Operation would delete existing data");
+      this.partition = partition;
+    }
+
+    public String partition() {
+      return partition;
+    }
+  }
+
+  private final Set<CharSequence> deletePaths = CharSequenceSet.empty();
+  private final Set<StructLikeWrapper> deleteFilePartitions = Sets.newHashSet();
+  private final Set<StructLikeWrapper> dropPartitions = Sets.newHashSet();
+  private Expression deleteExpression = Expressions.alwaysFalse();
+  private boolean hasPathOnlyDeletes = false;
+  private boolean failAnyDelete = false;
+  private boolean failMissingDeletePaths = false;
+  private int duplicateDeleteCount = 0;
+
+  // cache filtered manifests to avoid extra work when commits fail.
+  private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
+
+  // tracking where files were deleted to validate retries quickly
+  private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
+      Maps.newConcurrentMap();
+
+  protected abstract PartitionSpec spec(int specId);
+  protected abstract void deleteFile(String location);
+  protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
+  protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);
+
+  protected void failAnyDelete() {
+    this.failAnyDelete = true;
+  }
+
+  protected void failMissingDeletePaths() {
+    this.failMissingDeletePaths = true;
+  }
+
+  /**
+   * Add a filter to match files to delete. A file will be deleted if all of the rows it contains
+   * match this or any other filter passed to this method.
+   *
+   * @param expr an expression to match rows.
+   */
+  protected void deleteByRowFilter(Expression expr) {
+    Preconditions.checkNotNull(expr, "Cannot delete files using filter: null");
+    invalidateFilteredCache();
+    this.deleteExpression = Expressions.or(deleteExpression, expr);
+  }
+
+  /**
+   * Add a partition tuple to drop from the table during the delete phase.
+   */
+  protected void dropPartition(StructLike partition) {
+    Preconditions.checkNotNull(partition, "Cannot delete files in invalid partition: null");
+    invalidateFilteredCache();
+    dropPartitions.add(StructLikeWrapper.wrap(partition));
+  }
+
+  /**
+   * Add a specific path to be deleted in the new snapshot.
+   */
+  void delete(F file) {
+    Preconditions.checkNotNull(file, "Cannot delete file: null");
+    invalidateFilteredCache();
+    deletePaths.add(file.path());
+    deleteFilePartitions.add(StructLikeWrapper.wrap(file.partition()));
+  }
+
+  /**
+   * Add a specific path to be deleted in the new snapshot.
+   */
+  void delete(CharSequence path) {
+    Preconditions.checkNotNull(path, "Cannot delete file path: null");
+    invalidateFilteredCache();
+    this.hasPathOnlyDeletes = true;
+    deletePaths.add(path);
+  }
+
+  /**
+   * Filter deleted files out of a list of manifests.
+   *
+   * @param tableSchema the current table schema
+   * @param manifests a list of manifests to be filtered
+   * @return an array of filtered manifests
+   */
+  List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manifests) {
+    if (manifests == null || manifests.isEmpty()) {
+      validateRequiredDeletes();
+      return ImmutableList.of();
+    }
+
+    // use a common metrics evaluator for all manifests because it is bound to the table schema
+    StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(tableSchema, deleteExpression);
+
+    ManifestFile[] filtered = new ManifestFile[manifests.size()];
+    // open all of the manifest files in parallel, use index to avoid reordering
+    Tasks.range(filtered.length)
+        .stopOnFailure().throwFailureWhenFinished()
+        .executeWith(ThreadPools.getWorkerPool())
+        .run(index -> {
+          ManifestFile manifest = filterManifest(metricsEvaluator, manifests.get(index));
+          filtered[index] = manifest;
+        });
+
+    validateRequiredDeletes(filtered);
+
+    return Arrays.asList(filtered);
+  }
+
+  /**
+   * Creates a snapshot summary builder with the files deleted from the set of filtered manifests.
+   *
+   * @param manifests a set of filtered manifests
+   */
+  SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
+    SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
+
+    for (ManifestFile manifest : manifests) {
+      PartitionSpec manifestSpec = spec(manifest.partitionSpecId());
+      Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+      if (manifestDeletes != null) {
+        for (F file : manifestDeletes) {
+          summaryBuilder.deletedFile(manifestSpec, file);
+        }
+      }
+    }
+
+    summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount);
+
+    return summaryBuilder;
+  }
+
+  /**
+   * Throws a {@link ValidationException} if any deleted file was not present in a filtered manifest.
+   *
+   * @param manifests a set of filtered manifests
+   */
+  private void validateRequiredDeletes(ManifestFile... manifests) {
+    if (failMissingDeletePaths) {
+      Set<CharSequence> deletedFiles = deletedFiles(manifests);
+      ValidationException.check(deletedFiles.containsAll(deletePaths),
+          "Missing required files to delete: %s",
+          COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
+    }
+  }
+
+  private Set<CharSequence> deletedFiles(ManifestFile[] manifests) {
+    Set<CharSequence> deletedFiles = CharSequenceSet.empty();
+
+    if (manifests != null) {
+      for (ManifestFile manifest : manifests) {
+        Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+        if (manifestDeletes != null) {
+          for (F file : manifestDeletes) {
+            deletedFiles.add(file.path());
+          }
+        }
+      }
+    }
+
+    return deletedFiles;
+  }
+
+  /**
+   * Deletes filtered manifests that were created by this class, but are not in the committed manifest set.
+   *
+   * @param committed the set of manifest files that were committed
+   */
+  void cleanUncommitted(Set<ManifestFile> committed) {
+    // iterate over a copy of entries to avoid concurrent modification
+    List<Map.Entry<ManifestFile, ManifestFile>> filterEntries =
+        Lists.newArrayList(filteredManifests.entrySet());
+
+    for (Map.Entry<ManifestFile, ManifestFile> entry : filterEntries) {
+      // remove any new filtered manifests that aren't in the committed list
+      ManifestFile manifest = entry.getKey();
+      ManifestFile filtered = entry.getValue();
+      if (!committed.contains(filtered)) {
+        // only delete if the filtered copy was created
+        if (!manifest.equals(filtered)) {
+          deleteFile(filtered.path());
+        }
+
+        // remove the entry from the cache
+        filteredManifests.remove(manifest);
+      }
+    }
+  }
+
+  private void invalidateFilteredCache() {
+    cleanUncommitted(SnapshotProducer.EMPTY_SET);
+  }
+
+  /**
+   * @return a ManifestReader that is a filtered version of the input manifest.
+   */
+  private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest) {
+    ManifestFile cached = filteredManifests.get(manifest);
+    if (cached != null) {
+      return cached;
+    }
+
+    boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
+    if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
+      filteredManifests.put(manifest, manifest);
+      return manifest;
+    }
+
+    try (ManifestReader<F> reader = newManifestReader(manifest)) {
+
+      // this is reused to compare file paths with the delete set
+      CharSequenceWrapper pathWrapper = CharSequenceWrapper.wrap("");
+
+      // reused to compare file partitions with the drop set
+      StructLikeWrapper partitionWrapper = StructLikeWrapper.wrap(null);
+
+      // this assumes that the manifest doesn't have files to remove and streams through the
+      // manifest without copying data. if a manifest does have a file to remove, this will break
+      // out of the loop and move on to filtering the manifest.
+      boolean hasDeletedFiles =
+          manifestHasDeletedFiles(metricsEvaluator, reader, pathWrapper, partitionWrapper);
+
+      if (!hasDeletedFiles) {
+        filteredManifests.put(manifest, manifest);
+        return manifest;
+      }
+
+      return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader, pathWrapper, partitionWrapper);
+
+    } catch (IOException e) {
+      throw new RuntimeIOException("Failed to close manifest: " + manifest, e);
+    }
+  }
+
+  private boolean canContainDeletedFiles(ManifestFile manifest) {
+    boolean canContainExpressionDeletes;
+    if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
+      ManifestEvaluator manifestEvaluator =
+          ManifestEvaluator.forRowFilter(deleteExpression, spec(manifest.partitionSpecId()), true);
+      canContainExpressionDeletes = manifestEvaluator.eval(manifest);
+    } else {
+      canContainExpressionDeletes = false;
+    }
+
+    boolean canContainDroppedPartitions;
+    if (dropPartitions.size() > 0) {
+      canContainDroppedPartitions = ManifestFileUtil.canContainAny(
+          manifest,
+          Iterables.transform(dropPartitions, StructLikeWrapper::get),
+          this::spec);
+    } else {
+      canContainDroppedPartitions = false;
+    }
+
+    boolean canContainDroppedFiles;
+    if (hasPathOnlyDeletes) {
+      canContainDroppedFiles = true;
+    } else if (deletePaths.size() > 0) {
+      // because there were no path-only deletes, the set of deleted file partitions is valid
+      canContainDroppedFiles = ManifestFileUtil.canContainAny(
+          manifest,
+          Iterables.transform(deleteFilePartitions, StructLikeWrapper::get),
+          this::spec);
+    } else {
+      canContainDroppedFiles = false;
+    }
+
+    return canContainExpressionDeletes || canContainDroppedPartitions || canContainDroppedFiles;
+  }
+
+  private boolean manifestHasDeletedFiles(
+      StrictMetricsEvaluator metricsEvaluator, ManifestReader<F> reader,
+      CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) {
+    Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
+    Evaluator strict = strictDeleteEvaluator(reader.spec());
+    boolean hasDeletedFiles = false;
+    for (ManifestEntry<F> entry : reader.entries()) {
+      F file = entry.file();
+      boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) ||
+          dropPartitions.contains(partitionWrapper.set(file.partition()));
+      if (fileDelete || inclusive.eval(file.partition())) {
+        ValidationException.check(
+            fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
+            "Cannot delete file where some, but not all, rows match filter %s: %s",
+            this.deleteExpression, file.path());
+
+        hasDeletedFiles = true;
+        if (failAnyDelete) {
+          throw new DeleteException(reader.spec().partitionToPath(file.partition()));
+        }
+        break; // as soon as a deleted file is detected, stop scanning
+      }
+    }
+    return hasDeletedFiles;
+  }
+
+  private ManifestFile filterManifestWithDeletedFiles(
+      StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader<F> reader,
+      CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) {
+    Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
+    Evaluator strict = strictDeleteEvaluator(reader.spec());
+    // when this point is reached, there is at least one file that will be deleted in the
+    // manifest. produce a copy of the manifest with all deleted files removed.
+    List<F> deletedFiles = Lists.newArrayList();
+    Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
+
+    try {
+      ManifestWriter<F> writer = newManifestWriter(reader.spec());
+      try {
+        reader.entries().forEach(entry -> {
+          F file = entry.file();
+          boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) ||
+              dropPartitions.contains(partitionWrapper.set(file.partition()));
+          if (entry.status() != ManifestEntry.Status.DELETED) {
+            if (fileDelete || inclusive.eval(file.partition())) {
+              ValidationException.check(
+                  fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
+                  "Cannot delete file where some, but not all, rows match filter %s: %s",
+                  this.deleteExpression, file.path());
+
+              writer.delete(entry);
+
+              CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
+              if (deletedPaths.contains(wrapper)) {
+                LOG.warn("Deleting a duplicate path from manifest {}: {}",
+                    manifest.path(), wrapper.get());
+                duplicateDeleteCount += 1;
+              } else {
+                // only add the file to deletes if it is a new delete
+                // this keeps the snapshot summary accurate for non-duplicate data
+                deletedFiles.add(entry.file().copyWithoutStats());
+              }
+              deletedPaths.add(wrapper);
+
+            } else {
+              writer.existing(entry);
+            }
+          }
+        });
+      } finally {
+        writer.close();
+      }
+
+      // return the filtered manifest as a reader
+      ManifestFile filtered = writer.toManifestFile();
+
+      // update caches
+      filteredManifests.put(manifest, filtered);
+      filteredManifestToDeletedFiles.put(filtered, deletedFiles);
+
+      return filtered;
+
+    } catch (IOException e) {
+      throw new RuntimeIOException("Failed to close manifest writer", e);
+    }
+  }
+
+  private Evaluator strictDeleteEvaluator(PartitionSpec spec) {
+    Expression strictExpr = Projections.strict(spec).project(deleteExpression);
+    return new Evaluator(spec.partitionType(), strictExpr);
+  }
+
+  private Evaluator inclusiveDeleteEvaluator(PartitionSpec spec) {
+    Expression inclusiveExpr = Projections.inclusive(spec).project(deleteExpression);
+    return new Evaluator(spec.partitionType(), inclusiveExpr);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
new file mode 100644
index 0000000..2dc8148
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.util.BinPacking.ListPacker;
+import org.apache.iceberg.util.Exceptions;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+
+abstract class ManifestMergeManager<F extends ContentFile<F>> {
+  private final long targetSizeBytes;
+  private final int minCountToMerge;
+  private final boolean mergeEnabled;
+
+  // cache merge results to reuse when retrying
+  private final Map<List<ManifestFile>, ManifestFile> mergedManifests = Maps.newConcurrentMap();
+
+  ManifestMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) {
+    this.targetSizeBytes = targetSizeBytes;
+    this.minCountToMerge = minCountToMerge;
+    this.mergeEnabled = mergeEnabled;
+  }
+
+  protected abstract long snapshotId();
+  protected abstract PartitionSpec spec(int specId);
+  protected abstract void deleteFile(String location);
+  protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
+  protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);
+
+  Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
+    Iterator<ManifestFile> manifestIter = manifests.iterator();
+    if (!mergeEnabled || !manifestIter.hasNext()) {
+      return manifests;
+    }
+
+    ManifestFile first = manifestIter.next();
+
+    List<ManifestFile> merged = Lists.newArrayList();
+    ListMultimap<Integer, ManifestFile> groups = groupBySpec(first, manifestIter);
+    for (Integer specId : groups.keySet()) {
+      Iterables.addAll(merged, mergeGroup(first, specId, groups.get(specId)));
+    }
+
+    return merged;
+  }
+
+  void cleanUncommitted(Set<ManifestFile> committed) {
+    // iterate over a copy of entries to avoid concurrent modification
+    List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
+        Lists.newArrayList(mergedManifests.entrySet());
+
+    for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
+      // delete any new merged manifests that aren't in the committed list
+      ManifestFile merged = entry.getValue();
+      if (!committed.contains(merged)) {
+        deleteFile(merged.path());
+        // remove the deleted file from the cache
+        mergedManifests.remove(entry.getKey());
+      }
+    }
+  }
+
+  private ListMultimap<Integer, ManifestFile> groupBySpec(ManifestFile first, Iterator<ManifestFile> remaining) {
+    ListMultimap<Integer, ManifestFile> groups = Multimaps.newListMultimap(
+        Maps.newTreeMap(Comparator.<Integer>reverseOrder()),
+        Lists::newArrayList);
+    groups.put(first.partitionSpecId(), first);
+    remaining.forEachRemaining(manifest -> groups.put(manifest.partitionSpecId(), manifest));
+    return groups;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Iterable<ManifestFile> mergeGroup(ManifestFile first, int specId, List<ManifestFile> group) {
+    // use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
+    // from the end so that the manifest that gets under-filled is the first one, which will be
+    // merged the next time.
+    ListPacker<ManifestFile> packer = new ListPacker<>(targetSizeBytes, 1, false);
+    List<List<ManifestFile>> bins = packer.packEnd(group, ManifestFile::length);
+
+    // process bins in parallel, but put results in the order of the bins into an array to preserve
+    // the order of manifests and contents. preserving the order helps avoid random deletes when
+    // data files are eventually aged off.
+    List<ManifestFile>[] binResults = (List<ManifestFile>[])
+        Array.newInstance(List.class, bins.size());
+
+    Tasks.range(bins.size())
+        .stopOnFailure().throwFailureWhenFinished()
+        .executeWith(ThreadPools.getWorkerPool())
+        .run(index -> {
+          List<ManifestFile> bin = bins.get(index);
+          List<ManifestFile> outputManifests = Lists.newArrayList();
+          binResults[index] = outputManifests;
+
+          if (bin.size() == 1) {
+            // no need to rewrite
+            outputManifests.add(bin.get(0));
+            return;
+          }
+
+          // if the bin has the first manifest (the new data files or an appended manifest file) then only merge it
+          // if the number of manifests is above the minimum count. this is applied only to bins with an in-memory
+          // manifest so that large manifests don't prevent merging older groups.
+          if (bin.contains(first) && bin.size() < minCountToMerge) {
+            // not enough to merge, add all manifest files to the output list
+            outputManifests.addAll(bin);
+          } else {
+            // merge the group
+            outputManifests.add(createManifest(specId, bin));
+          }
+        });
+
+    return Iterables.concat(binResults);
+  }
+
+  private ManifestFile createManifest(int specId, List<ManifestFile> bin) {
+    // if this merge was already rewritten, use the existing file.
+    // if the new files are in this merge, then the ManifestFile for the new files has changed and
+    // will be a cache miss.
+    if (mergedManifests.containsKey(bin)) {
+      return mergedManifests.get(bin);
+    }
+
+    ManifestWriter<F> writer = newManifestWriter(spec(specId));
+    boolean threw = true;
+    try {
+      for (ManifestFile manifest : bin) {
+        try (ManifestReader<F> reader = newManifestReader(manifest)) {
+          for (ManifestEntry<F> entry : reader.entries()) {
+            if (entry.status() == Status.DELETED) {
+              // suppress deletes from previous snapshots. only files deleted by this snapshot
+              // should be added to the new manifest
+              if (entry.snapshotId() == snapshotId()) {
+                writer.delete(entry);
+              }
+            } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
+              // adds from this snapshot are still adds, otherwise they should be existing
+              writer.add(entry);
+            } else {
+              // add all files from the old manifest as existing files
+              writer.existing(entry);
+            }
+          }
+        } catch (IOException e) {
+          throw new RuntimeIOException("Failed to close manifest reader", e);
+        }
+      }
+      threw = false;
+
+    } finally {
+      Exceptions.close(writer, threw);
+    }
+
+    ManifestFile manifest = writer.toManifestFile();
+
+    // update the cache
+    mergedManifests.put(bin, manifest);
+
+    return manifest;
+
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 3076df3..057b45f 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -20,39 +20,18 @@
 package org.apache.iceberg;
 
 import java.io.IOException;
-import java.lang.reflect.Array;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.events.CreateSnapshotEvent;
 import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.expressions.Projections;
-import org.apache.iceberg.expressions.StrictMetricsEvaluator;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.util.BinPacking.ListPacker;
-import org.apache.iceberg.util.CharSequenceWrapper;
-import org.apache.iceberg.util.ManifestFileUtil;
-import org.apache.iceberg.util.StructLikeWrapper;
-import org.apache.iceberg.util.Tasks;
-import org.apache.iceberg.util.ThreadPools;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
 import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT;
@@ -62,73 +41,91 @@ import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED
 import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;
 
 abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
-  private static final Logger LOG = LoggerFactory.getLogger(MergingSnapshotProducer.class);
+  private final String tableName;
+  private final TableOperations ops;
+  private final PartitionSpec spec;
+  private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
+  private final ManifestMergeManager<DataFile> mergeManager;
+  private final ManifestFilterManager<DataFile> filterManager;
+  private final boolean snapshotIdInheritanceEnabled;
 
-  private static final Joiner COMMA = Joiner.on(",");
+  private class DataFileFilterManager extends ManifestFilterManager<DataFile> {
+    @Override
+    protected PartitionSpec spec(int specId) {
+      return ops.current().spec(specId);
+    }
 
-  protected static class DeleteException extends ValidationException {
-    private final String partition;
+    @Override
+    protected void deleteFile(String location) {
+      MergingSnapshotProducer.this.deleteFile(location);
+    }
 
-    private DeleteException(String partition) {
-      super("Operation would delete existing data");
-      this.partition = partition;
+    @Override
+    protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) {
+      return MergingSnapshotProducer.this.newManifestWriter(manifestSpec);
     }
 
-    public String partition() {
-      return partition;
+    @Override
+    protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
+      return ManifestFiles.read(manifest, ops.io(), ops.current().specsById());
     }
   }
 
-  private final String tableName;
-  private final TableOperations ops;
-  private final PartitionSpec spec;
-  private final long manifestTargetSizeBytes;
-  private final int minManifestsCountToMerge;
-  private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
-  private final boolean mergeEnabled;
-  private final boolean snapshotIdInheritanceEnabled;
+  private class DataFileMergeManager extends ManifestMergeManager<DataFile> {
+    DataFileMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) {
+      super(targetSizeBytes, minCountToMerge, mergeEnabled);
+    }
+
+    @Override
+    protected long snapshotId() {
+      return MergingSnapshotProducer.this.snapshotId();
+    }
+
+    @Override
+    protected PartitionSpec spec(int specId) {
+      return ops.current().spec(specId);
+    }
+
+    @Override
+    protected void deleteFile(String location) {
+      MergingSnapshotProducer.this.deleteFile(location);
+    }
+
+    @Override
+    protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) {
+      return MergingSnapshotProducer.this.newManifestWriter(manifestSpec);
+    }
+
+    @Override
+    protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
+      return ManifestFiles.read(manifest, ops.io(), ops.current().specsById());
+    }
+  }
 
   // update data
   private final List<DataFile> newFiles = Lists.newArrayList();
   private final List<ManifestFile> appendManifests = Lists.newArrayList();
   private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
   private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder();
-  private final Set<CharSequenceWrapper> deletePaths = Sets.newHashSet();
-  private final Set<StructLikeWrapper> deleteFilePartitions = Sets.newHashSet();
-  private final Set<StructLikeWrapper> dropPartitions = Sets.newHashSet();
   private Expression deleteExpression = Expressions.alwaysFalse();
-  private boolean hasPathOnlyDeletes = false;
-  private boolean failAnyDelete = false;
-  private boolean failMissingDeletePaths = false;
 
   // cache the new manifest once it is written
   private ManifestFile cachedNewManifest = null;
-  private ManifestFile firstAppendedManifest = null;
   private boolean hasNewFiles = false;
 
-  // cache merge results to reuse when retrying
-  private final Map<List<ManifestFile>, ManifestFile> mergeManifests = Maps.newConcurrentMap();
-
-  // cache filtered manifests to avoid extra work when commits fail.
-  private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
-
-  // tracking where files were deleted to validate retries quickly
-  private final Map<ManifestFile, Iterable<DataFile>> filteredManifestToDeletedFiles =
-      Maps.newConcurrentMap();
-
-  private boolean filterUpdated = false; // used to clear caches of filtered and merged manifests
-
   MergingSnapshotProducer(String tableName, TableOperations ops) {
     super(ops);
     this.tableName = tableName;
     this.ops = ops;
     this.spec = ops.current().spec();
-    this.manifestTargetSizeBytes = ops.current()
+    long targetSizeBytes = ops.current()
         .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
-    this.minManifestsCountToMerge = ops.current()
+    int minCountToMerge = ops.current()
         .propertyAsInt(MANIFEST_MIN_MERGE_COUNT, MANIFEST_MIN_MERGE_COUNT_DEFAULT);
-    this.mergeEnabled = ops.current()
+    boolean mergeEnabled = ops.current()
         .propertyAsBoolean(TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT);
+    this.mergeManager = new DataFileMergeManager(targetSizeBytes, minCountToMerge, mergeEnabled);
+    this.filterManager = new DataFileFilterManager();
     this.snapshotIdInheritanceEnabled = ops.current()
         .propertyAsBoolean(SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
   }
@@ -153,11 +150,11 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
   }
 
   protected void failAnyDelete() {
-    this.failAnyDelete = true;
+    filterManager.failAnyDelete();
   }
 
   protected void failMissingDeletePaths() {
-    this.failMissingDeletePaths = true;
+    filterManager.failMissingDeletePaths();
   }
 
   /**
@@ -167,36 +164,29 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
    * @param expr an expression to match rows.
    */
   protected void deleteByRowFilter(Expression expr) {
-    Preconditions.checkNotNull(expr, "Cannot delete files using filter: null");
-    this.filterUpdated = true;
-    this.deleteExpression = Expressions.or(deleteExpression, expr);
+    this.deleteExpression = expr;
+    filterManager.deleteByRowFilter(expr);
   }
 
   /**
    * Add a partition tuple to drop from the table during the delete phase.
    */
   protected void dropPartition(StructLike partition) {
-    dropPartitions.add(StructLikeWrapper.wrap(partition));
+    filterManager.dropPartition(partition);
   }
 
   /**
    * Add a specific path to be deleted in the new snapshot.
    */
   protected void delete(DataFile file) {
-    Preconditions.checkNotNull(file, "Cannot delete file: null");
-    this.filterUpdated = true;
-    deletePaths.add(CharSequenceWrapper.wrap(file.path()));
-    deleteFilePartitions.add(StructLikeWrapper.wrap(file.partition()));
+    filterManager.delete(file);
   }
 
   /**
    * Add a specific path to be deleted in the new snapshot.
    */
   protected void delete(CharSequence path) {
-    Preconditions.checkNotNull(path, "Cannot delete file path: null");
-    this.filterUpdated = true;
-    this.hasPathOnlyDeletes = true;
-    deletePaths.add(CharSequenceWrapper.wrap(path));
+    filterManager.delete(path);
   }
 
   /**
@@ -211,22 +201,13 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
    * Add all files in a manifest to the new snapshot.
    */
   protected void add(ManifestFile manifest) {
-    ManifestFile appendedManifest;
     if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
       appendedManifestsSummary.addedManifest(manifest);
       appendManifests.add(manifest);
-      appendedManifest = manifest;
     } else {
       // the manifest must be rewritten with this update's snapshot ID
       ManifestFile copiedManifest = copyManifest(manifest);
       rewrittenAppendManifests.add(copiedManifest);
-      appendedManifest = copiedManifest;
-    }
-
-    // keep reference of the first appended manifest, so that we can avoid merging first bin(s)
-    // which has the first appended manifest and have not crossed the limit of minManifestsCountToMerge
-    if (firstAppendedManifest == null) {
-      firstAppendedManifest = appendedManifest;
     }
   }
 
@@ -247,79 +228,29 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
   public List<ManifestFile> apply(TableMetadata base) {
     summaryBuilder.clear();
     summaryBuilder.merge(appendedManifestsSummary);
-
-    if (filterUpdated) {
-      cleanUncommittedFilters(SnapshotProducer.EMPTY_SET);
-      this.filterUpdated = false;
-    }
+    Iterable<ManifestFile> newManifests = prepareNewManifests();
 
     Snapshot current = base.currentSnapshot();
-    Map<Integer, List<ManifestFile>> groups = Maps.newTreeMap(Comparator.<Integer>reverseOrder());
-
-    // use a common metrics evaluator for all manifests because it is bound to the table schema
-    StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(
-        ops.current().schema(), deleteExpression);
-
-    // add the current spec as the first group. files are added to the beginning.
-    try {
-      Iterable<ManifestFile> newManifests;
-      if (newFiles.size() > 0) {
-        // add all of the new files to the summary builder
-        for (DataFile file : newFiles) {
-          summaryBuilder.addedFile(spec, file);
-        }
-
-        ManifestFile newManifest = newFilesAsManifest();
-        newManifests = Iterables.concat(ImmutableList.of(newManifest), appendManifests, rewrittenAppendManifests);
-      } else {
-        newManifests = Iterables.concat(appendManifests, rewrittenAppendManifests);
-      }
 
-      Iterable<ManifestFile> newManifestsWithMetadata = Iterables.transform(
-          newManifests,
-          manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
-
-      // filter any existing manifests
-      List<ManifestFile> filtered;
-      if (current != null) {
-        List<ManifestFile> manifests = current.dataManifests();
-        filtered = Arrays.asList(filterManifests(metricsEvaluator, manifests));
-      } else {
-        filtered = ImmutableList.of();
-      }
+    // filter any existing manifests
+    List<ManifestFile> filtered = filterManager.filterManifests(
+        base.schema(), current != null ? current.dataManifests() : null);
 
-      Iterable<ManifestFile> unmergedManifests = Iterables.filter(
-          Iterables.concat(newManifestsWithMetadata, filtered),
-          // only keep manifests that have live data files or that were written by this commit
-          manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.snapshotId() == snapshotId());
+    Iterable<ManifestFile> unmergedManifests = Iterables.filter(
+        Iterables.concat(newManifests, filtered),
+        // only keep manifests that have live data files or that were written by this commit
+        manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.snapshotId() == snapshotId());
 
-      Set<CharSequenceWrapper> deletedFiles = deletedFiles(unmergedManifests);
+    summaryBuilder.merge(filterManager.buildSummary(unmergedManifests));
 
-      List<ManifestFile> manifests = Lists.newArrayList();
-      if (mergeEnabled) {
-        groupManifestsByPartitionSpec(groups, unmergedManifests);
-        for (Map.Entry<Integer, List<ManifestFile>> entry : groups.entrySet()) {
-          Iterables.addAll(manifests, mergeGroup(entry.getKey(), entry.getValue()));
-        }
-      } else {
-        Iterables.addAll(manifests, unmergedManifests);
-      }
+    List<ManifestFile> manifests = Lists.newArrayList();
+    Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
 
-      if (current != null) {
-        manifests.addAll(current.deleteManifests());
-      }
-
-      ValidationException.check(!failMissingDeletePaths || deletedFiles.containsAll(deletePaths),
-          "Missing required files to delete: %s",
-          COMMA.join(Iterables.transform(Iterables.filter(deletePaths,
-              path -> !deletedFiles.contains(path)),
-              CharSequenceWrapper::get)));
-
-      return manifests;
-
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to create snapshot manifest list");
+    if (current != null) {
+      manifests.addAll(current.deleteManifests());
     }
+
+    return manifests;
   }
 
   @Override
@@ -334,87 +265,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
         summary());
   }
 
-  private ManifestFile[] filterManifests(StrictMetricsEvaluator metricsEvaluator, List<ManifestFile> manifests)
-      throws IOException {
-    ManifestFile[] filtered = new ManifestFile[manifests.size()];
-    // open all of the manifest files in parallel, use index to avoid reordering
-    Tasks.range(filtered.length)
-        .stopOnFailure().throwFailureWhenFinished()
-        .executeWith(ThreadPools.getWorkerPool())
-        .run(index -> {
-          ManifestFile manifest = filterManifest(metricsEvaluator, manifests.get(index));
-          filtered[index] = manifest;
-        }, IOException.class);
-    return filtered;
-  }
-
-  private Set<CharSequenceWrapper> deletedFiles(Iterable<ManifestFile> manifests) {
-    Set<CharSequenceWrapper> deletedFiles = Sets.newHashSet();
-
-    for (ManifestFile manifest : manifests) {
-      PartitionSpec manifestSpec = ops.current().spec(manifest.partitionSpecId());
-      Iterable<DataFile> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
-      if (manifestDeletes != null) {
-        for (DataFile file : manifestDeletes) {
-          summaryBuilder.deletedFile(manifestSpec, file);
-          deletedFiles.add(CharSequenceWrapper.wrap(file.path()));
-        }
-      }
-    }
-
-    return deletedFiles;
-  }
-
-  private void groupManifestsByPartitionSpec(Map<Integer, List<ManifestFile>> groups, Iterable<ManifestFile> filtered) {
-    for (ManifestFile manifest : filtered) {
-      List<ManifestFile> group = groups.get(manifest.partitionSpecId());
-      if (group != null) {
-        group.add(manifest);
-      } else {
-        group = Lists.newArrayList();
-        group.add(manifest);
-        groups.put(manifest.partitionSpecId(), group);
-      }
-    }
-  }
-
-  private void cleanUncommittedMerges(Set<ManifestFile> committed) {
-    // iterate over a copy of entries to avoid concurrent modification
-    List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
-        Lists.newArrayList(mergeManifests.entrySet());
-
-    for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
-      // delete any new merged manifests that aren't in the committed list
-      ManifestFile merged = entry.getValue();
-      if (!committed.contains(merged)) {
-        deleteFile(merged.path());
-        // remove the deleted file from the cache
-        mergeManifests.remove(entry.getKey());
-      }
-    }
-  }
-
-  private void cleanUncommittedFilters(Set<ManifestFile> committed) {
-    // iterate over a copy of entries to avoid concurrent modification
-    List<Map.Entry<ManifestFile, ManifestFile>> filterEntries =
-        Lists.newArrayList(filteredManifests.entrySet());
-
-    for (Map.Entry<ManifestFile, ManifestFile> entry : filterEntries) {
-      // remove any new filtered manifests that aren't in the committed list
-      ManifestFile manifest = entry.getKey();
-      ManifestFile filtered = entry.getValue();
-      if (!committed.contains(filtered)) {
-        // only delete if the filtered copy was created
-        if (!manifest.equals(filtered)) {
-          deleteFile(filtered.path());
-        }
-
-        // remove the entry from the cache
-        filteredManifests.remove(manifest);
-      }
-    }
-  }
-
   private void cleanUncommittedAppends(Set<ManifestFile> committed) {
     if (cachedNewManifest != null && !committed.contains(cachedNewManifest)) {
       deleteFile(cachedNewManifest.path());
@@ -442,283 +292,50 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
 
   @Override
   protected void cleanUncommitted(Set<ManifestFile> committed) {
-    cleanUncommittedMerges(committed);
-    cleanUncommittedFilters(committed);
+    mergeManager.cleanUncommitted(committed);
+    filterManager.cleanUncommitted(committed);
     cleanUncommittedAppends(committed);
   }
 
-  private boolean canContainDeletedFiles(ManifestFile manifest) {
-    boolean canContainExpressionDeletes;
-    if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
-      ManifestEvaluator manifestEvaluator =
-          ManifestEvaluator.forRowFilter(deleteExpression, ops.current().spec(), true);
-      canContainExpressionDeletes = manifestEvaluator.eval(manifest);
-    } else {
-      canContainExpressionDeletes = false;
-    }
-
-    boolean canContainDroppedPartitions;
-    if (dropPartitions.size() > 0) {
-      canContainDroppedPartitions = ManifestFileUtil.canContainAny(
-          manifest,
-          Iterables.transform(dropPartitions, StructLikeWrapper::get),
-          specId -> ops.current().spec(specId));
-    } else {
-      canContainDroppedPartitions = false;
-    }
-
-    boolean canContainDroppedFiles;
-    if (hasPathOnlyDeletes) {
-      canContainDroppedFiles = true;
-    } else if (deletePaths.size() > 0) {
-      // because there were no path-only deletes, the set of deleted file partitions is valid
-      canContainDroppedFiles = ManifestFileUtil.canContainAny(
-          manifest,
-          Iterables.transform(deleteFilePartitions, StructLikeWrapper::get),
-          specId -> ops.current().spec(specId));
-    } else {
-      canContainDroppedFiles = false;
-    }
-
-    return canContainExpressionDeletes || canContainDroppedPartitions || canContainDroppedFiles;
-  }
-
-  /**
-   * @return a ManifestReader that is a filtered version of the input manifest.
-   */
-  private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator,
-                                      ManifestFile manifest) throws IOException {
-    ManifestFile cached = filteredManifests.get(manifest);
-    if (cached != null) {
-      return cached;
-    }
-
-    boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
-    if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
-      filteredManifests.put(manifest, manifest);
-      return manifest;
-    }
-
-    try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
-
-      // this is reused to compare file paths with the delete set
-      CharSequenceWrapper pathWrapper = CharSequenceWrapper.wrap("");
-
-      // reused to compare file partitions with the drop set
-      StructLikeWrapper partitionWrapper = StructLikeWrapper.wrap(null);
-
-      // this assumes that the manifest doesn't have files to remove and streams through the
-      // manifest without copying data. if a manifest does have a file to remove, this will break
-      // out of the loop and move on to filtering the manifest.
-      boolean hasDeletedFiles =
-          manifestHasDeletedFiles(metricsEvaluator, reader, pathWrapper, partitionWrapper);
-
-      if (!hasDeletedFiles) {
-        filteredManifests.put(manifest, manifest);
-        return manifest;
+  private Iterable<ManifestFile> prepareNewManifests() {
+    Iterable<ManifestFile> newManifests;
+    if (newFiles.size() > 0) {
+      // add all of the new files to the summary builder
+      for (DataFile file : newFiles) {
+        summaryBuilder.addedFile(spec, file);
       }
 
-      return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader, pathWrapper,
-          partitionWrapper);
-    }
-  }
-
-  private boolean manifestHasDeletedFiles(
-      StrictMetricsEvaluator metricsEvaluator, ManifestReader<DataFile> reader,
-      CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) {
-    Evaluator inclusive = extractInclusiveDeleteExpression(reader);
-    Evaluator strict = extractStrictDeleteExpression(reader);
-    boolean hasDeletedFiles = false;
-    for (ManifestEntry<DataFile> entry : reader.entries()) {
-      DataFile file = entry.file();
-      boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) ||
-          dropPartitions.contains(partitionWrapper.set(file.partition()));
-      if (fileDelete || inclusive.eval(file.partition())) {
-        ValidationException.check(
-            fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
-            "Cannot delete file where some, but not all, rows match filter %s: %s",
-            this.deleteExpression, file.path());
-
-        hasDeletedFiles = true;
-        if (failAnyDelete) {
-          throw new DeleteException(writeSpec().partitionToPath(file.partition()));
-        }
-        break; // as soon as a deleted file is detected, stop scanning
-      }
-    }
-    return hasDeletedFiles;
-  }
-
-  private ManifestFile filterManifestWithDeletedFiles(
-      StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader<DataFile> reader,
-      CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) throws IOException {
-    Evaluator inclusive = extractInclusiveDeleteExpression(reader);
-    Evaluator strict = extractStrictDeleteExpression(reader);
-    // when this point is reached, there is at least one file that will be deleted in the
-    // manifest. produce a copy of the manifest with all deleted files removed.
-    List<DataFile> deletedFiles = Lists.newArrayList();
-    Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
-    ManifestWriter<DataFile> writer = newManifestWriter(reader.spec());
-    try {
-      reader.entries().forEach(entry -> {
-        DataFile file = entry.file();
-        boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) ||
-            dropPartitions.contains(partitionWrapper.set(file.partition()));
-        if (entry.status() != Status.DELETED) {
-          if (fileDelete || inclusive.eval(file.partition())) {
-            ValidationException.check(
-                fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
-                "Cannot delete file where some, but not all, rows match filter %s: %s",
-                this.deleteExpression, file.path());
-
-            writer.delete(entry);
-
-            CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
-            if (deletedPaths.contains(wrapper)) {
-              LOG.warn("Deleting a duplicate path from manifest {}: {}",
-                  manifest.path(), wrapper.get());
-              summaryBuilder.incrementDuplicateDeletes();
-            } else {
-              // only add the file to deletes if it is a new delete
-              // this keeps the snapshot summary accurate for non-duplicate data
-              deletedFiles.add(entry.file().copyWithoutStats());
-            }
-            deletedPaths.add(wrapper);
-
-          } else {
-            writer.existing(entry);
-          }
-        }
-      });
-    } finally {
-      writer.close();
-    }
-
-    // return the filtered manifest as a reader
-    ManifestFile filtered = writer.toManifestFile();
-
-    // update caches
-    filteredManifests.put(manifest, filtered);
-    filteredManifestToDeletedFiles.put(filtered, deletedFiles);
-
-    return filtered;
-  }
-
-  private Evaluator extractStrictDeleteExpression(ManifestReader reader) {
-    Expression strictExpr = Projections
-        .strict(reader.spec())
-        .project(deleteExpression);
-    return new Evaluator(reader.spec().partitionType(), strictExpr);
-  }
-
-  private Evaluator extractInclusiveDeleteExpression(ManifestReader reader) {
-    Expression inclusiveExpr = Projections
-        .inclusive(reader.spec())
-        .project(deleteExpression);
-    return new Evaluator(reader.spec().partitionType(), inclusiveExpr);
-  }
-
-  @SuppressWarnings("unchecked")
-  private Iterable<ManifestFile> mergeGroup(int specId, List<ManifestFile> group)
-      throws IOException {
-    // use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
-    // from the end so that the manifest that gets under-filled is the first one, which will be
-    // merged the next time.
-    ListPacker<ManifestFile> packer = new ListPacker<>(manifestTargetSizeBytes, 1, false);
-    List<List<ManifestFile>> bins = packer.packEnd(group, manifest -> manifest.length());
-
-    // process bins in parallel, but put results in the order of the bins into an array to preserve
-    // the order of manifests and contents. preserving the order helps avoid random deletes when
-    // data files are eventually aged off.
-    List<ManifestFile>[] binResults = (List<ManifestFile>[])
-        Array.newInstance(List.class, bins.size());
-    Tasks.range(bins.size())
-        .stopOnFailure().throwFailureWhenFinished()
-        .executeWith(ThreadPools.getWorkerPool())
-        .run(index -> {
-          List<ManifestFile> bin = bins.get(index);
-          List<ManifestFile> outputManifests = Lists.newArrayList();
-          binResults[index] = outputManifests;
-
-          if (bin.size() == 1) {
-            // no need to rewrite
-            outputManifests.add(bin.get(0));
-            return;
-          }
-
-          // if the bin has a new manifest (the new data files) or appended manifest file then only merge it
-          // if the number of manifests is above the minimum count. this is applied only to bins with an in-memory
-          // manifest so that large manifests don't prevent merging older groups.
-          if ((bin.contains(cachedNewManifest) || bin.contains(firstAppendedManifest)) &&
-              bin.size() < minManifestsCountToMerge) {
-            // not enough to merge, add all manifest files to the output list
-            outputManifests.addAll(bin);
-          } else {
-            // merge the group
-            outputManifests.add(createManifest(specId, bin));
-          }
-        }, IOException.class);
-
-    return Iterables.concat(binResults);
-  }
-
-  private ManifestFile createManifest(int specId, List<ManifestFile> bin) throws IOException {
-    // if this merge was already rewritten, use the existing file.
-    // if the new files are in this merge, then the ManifestFile for the new files has changed and
-    // will be a cache miss.
-    if (mergeManifests.containsKey(bin)) {
-      return mergeManifests.get(bin);
-    }
-
-    ManifestWriter<DataFile> writer = newManifestWriter(ops.current().spec(specId));
-    try {
-      for (ManifestFile manifest : bin) {
-        try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
-          for (ManifestEntry<DataFile> entry : reader.entries()) {
-            if (entry.status() == Status.DELETED) {
-              // suppress deletes from previous snapshots. only files deleted by this snapshot
-              // should be added to the new manifest
-              if (entry.snapshotId() == snapshotId()) {
-                writer.delete(entry);
-              }
-            } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
-              // adds from this snapshot are still adds, otherwise they should be existing
-              writer.add(entry);
-            } else {
-              // add all files from the old manifest as existing files
-              writer.existing(entry);
-            }
-          }
-        }
-      }
-    } finally {
-      writer.close();
+      ManifestFile newManifest = newFilesAsManifest();
+      newManifests = Iterables.concat(ImmutableList.of(newManifest), appendManifests, rewrittenAppendManifests);
+    } else {
+      newManifests = Iterables.concat(appendManifests, rewrittenAppendManifests);
     }
 
-    ManifestFile manifest = writer.toManifestFile();
-
-    // update the cache
-    mergeManifests.put(bin, manifest);
-
-    return manifest;
+    return Iterables.transform(
+        newManifests,
+        manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
   }
 
-  private ManifestFile newFilesAsManifest() throws IOException {
+  private ManifestFile newFilesAsManifest() {
     if (hasNewFiles && cachedNewManifest != null) {
       deleteFile(cachedNewManifest.path());
       cachedNewManifest = null;
     }
 
     if (cachedNewManifest == null) {
-      ManifestWriter writer = newManifestWriter(spec);
       try {
-        writer.addAll(newFiles);
-      } finally {
-        writer.close();
-      }
+        ManifestWriter<DataFile> writer = newManifestWriter(spec);
+        try {
+          writer.addAll(newFiles);
+        } finally {
+          writer.close();
+        }
 
-      this.cachedNewManifest = writer.toManifestFile();
-      this.hasNewFiles = false;
+        this.cachedNewManifest = writer.toManifestFile();
+        this.hasNewFiles = false;
+      } catch (IOException e) {
+        throw new RuntimeIOException("Failed to close manifest writer", e);
+      }
     }
 
     return cachedNewManifest;
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
index 8101d5c..21038d0 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
@@ -68,6 +68,18 @@ public class SnapshotSummary {
       this.deletedDuplicateFiles += 1;
     }
 
+    public void incrementDuplicateDeletes(int increment) {
+      this.deletedDuplicateFiles += increment;
+    }
+
+    public void deletedFile(PartitionSpec spec, ContentFile<?> file) {
+      if (file instanceof DataFile) {
+        deletedFile(spec, (DataFile) file);
+      } else {
+        throw new IllegalArgumentException("Unsupported file type: " + file.getClass().getSimpleName());
+      }
+    }
+
     public void deletedFile(PartitionSpec spec, DataFile file) {
       changedPartitions.add(spec.partitionToPath(file.partition()));
       this.deletedFiles += 1;
diff --git a/core/src/main/java/org/apache/iceberg/util/Exceptions.java b/core/src/main/java/org/apache/iceberg/util/Exceptions.java
index 775b3eb..0594345 100644
--- a/core/src/main/java/org/apache/iceberg/util/Exceptions.java
+++ b/core/src/main/java/org/apache/iceberg/util/Exceptions.java
@@ -19,10 +19,25 @@
 
 package org.apache.iceberg.util;
 
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+
 public class Exceptions {
   private Exceptions() {
   }
 
+  public static void close(Closeable closeable, boolean suppressExceptions) {
+    try {
+      closeable.close();
+    } catch (IOException e) {
+      if (!suppressExceptions) {
+        throw new RuntimeIOException("Failed calling close", e);
+      }
+      // otherwise, ignore the exception
+    }
+  }
+
   public static <E extends Exception> E suppressExceptions(E alreadyThrown, Runnable run) {
     try {
       run.run();
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 8ef9732..4e9f6a0 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -160,23 +160,32 @@ public class TableTestBase {
     return writer.toManifestFile();
   }
 
-  ManifestFile writeManifest(String fileName, ManifestEntry<DataFile>... entries) throws IOException {
+  ManifestFile writeManifest(String fileName, ManifestEntry<?>... entries) throws IOException {
     return writeManifest(null, fileName, entries);
   }
 
-  ManifestFile writeManifest(Long snapshotId, ManifestEntry<DataFile>... entries) throws IOException {
+  ManifestFile writeManifest(Long snapshotId, ManifestEntry<?>... entries) throws IOException {
     return writeManifest(snapshotId, "input.m0.avro", entries);
   }
 
-  ManifestFile writeManifest(Long snapshotId, String fileName, ManifestEntry<DataFile>... entries) throws IOException {
+  @SuppressWarnings("unchecked")
+  <F extends ContentFile<F>> ManifestFile writeManifest(Long snapshotId, String fileName, ManifestEntry<?>... entries)
+      throws IOException {
     File manifestFile = temp.newFile(fileName);
     Assert.assertTrue(manifestFile.delete());
     OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
 
-    ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
+    ManifestWriter<F> writer;
+    if (entries[0].file() instanceof DataFile) {
+      writer = (ManifestWriter<F>) ManifestFiles.write(
+          formatVersion, table.spec(), outputFile, snapshotId);
+    } else {
+      writer = (ManifestWriter<F>) ManifestFiles.writeDeleteManifest(
+          formatVersion, table.spec(), outputFile, snapshotId);
+    }
     try {
-      for (ManifestEntry<DataFile> entry : entries) {
-        writer.addEntry(entry);
+      for (ManifestEntry<?> entry : entries) {
+        writer.addEntry((ManifestEntry<F>) entry);
       }
     } finally {
       writer.close();
@@ -280,13 +289,21 @@ public class TableTestBase {
   void validateManifest(ManifestFile manifest,
                         Iterator<Long> ids,
                         Iterator<DataFile> expectedFiles) {
-    validateManifest(manifest, null, ids, expectedFiles);
+    validateManifest(manifest, null, ids, expectedFiles, null);
   }
 
   void validateManifest(ManifestFile manifest,
                         Iterator<Long> seqs,
                         Iterator<Long> ids,
                         Iterator<DataFile> expectedFiles) {
+    validateManifest(manifest, seqs, ids, expectedFiles, null);
+  }
+
+  void validateManifest(ManifestFile manifest,
+                        Iterator<Long> seqs,
+                        Iterator<Long> ids,
+                        Iterator<DataFile> expectedFiles,
+                        Iterator<ManifestEntry.Status> statuses) {
     for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
       DataFile file = entry.file();
       DataFile expected = expectedFiles.next();
@@ -298,6 +315,10 @@ public class TableTestBase {
           expected.path().toString(), file.path().toString());
       Assert.assertEquals("Snapshot ID should match expected ID",
           ids.next(), entry.snapshotId());
+      if (statuses != null) {
+        Assert.assertEquals("Status should match expected",
+            statuses.next(), entry.status());
+      }
     }
 
     Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext());
diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java
index 55cefcf..57912d3 100644
--- a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java
+++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.junit.Test;
 
@@ -79,8 +80,11 @@ public class TestSequenceNumberForV2Table extends TableTestBase {
     manifestFile = snap4.allManifests().stream()
         .filter(manifest -> manifest.snapshotId() == commitId4)
         .collect(Collectors.toList()).get(0);
-    validateManifest(manifestFile, seqs(4, 3, 2, 1), ids(commitId4, commitId3, commitId2, commitId1),
-        files(FILE_D, FILE_C, FILE_B, FILE_A));
+    validateManifest(manifestFile,
+        seqs(4, 3, 2, 1),
+        ids(commitId4, commitId3, commitId2, commitId1),
+        files(FILE_D, FILE_C, FILE_B, FILE_A),
+        statuses(Status.ADDED, Status.EXISTING, Status.EXISTING, Status.EXISTING));
     V2Assert.assertEquals("Snapshot sequence number should be 4", 4, snap4.sequenceNumber());
     V2Assert.assertEquals("Last sequence number should be 4", 4, readMetadata().lastSequenceNumber());
   }
@@ -115,7 +119,7 @@ public class TestSequenceNumberForV2Table extends TableTestBase {
     V2Assert.assertEquals("Last sequence number should be 3", 3, readMetadata().lastSequenceNumber());
 
     // FILE_A and FILE_B in manifest may reorder
-    for (ManifestEntry entry : ManifestFiles.read(newManifest, FILE_IO).entries()) {
+    for (ManifestEntry<DataFile> entry : ManifestFiles.read(newManifest, FILE_IO).entries()) {
       if (entry.file().path().equals(FILE_A.path())) {
         V2Assert.assertEquals("FILE_A sequence number should be 1", 1, entry.sequenceNumber().longValue());
       }
@@ -272,7 +276,7 @@ public class TestSequenceNumberForV2Table extends TableTestBase {
     manifestFile = table.currentSnapshot().allManifests().stream()
         .filter(manifest -> manifest.snapshotId() == commitId4)
         .collect(Collectors.toList()).get(0);
-    validateManifest(manifestFile, seqs(3, 2, 4), ids(commitId3, commitId2, commitId4), files(FILE_C, FILE_B, FILE_A));
+    validateManifest(manifestFile, seqs(4), ids(commitId4), files(FILE_A), statuses(Status.DELETED));
     V2Assert.assertEquals("Snapshot sequence number should be 4", 4, snap4.sequenceNumber());
     V2Assert.assertEquals("Last sequence number should be 4", 4, readMetadata().lastSequenceNumber());
   }