You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/06/10 15:43:29 UTC
[iceberg] branch master updated: Refactor MergingSnapshotProducer
(#1098)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bd606f5 Refactor MergingSnapshotProducer (#1098)
bd606f5 is described below
commit bd606f56b14c49f39aa745534718d0cb6a50a8c9
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Wed Jun 10 08:43:19 2020 -0700
Refactor MergingSnapshotProducer (#1098)
---
.../expressions/StrictMetricsEvaluator.java | 7 +-
.../org/apache/iceberg/util/CharSequenceSet.java | 5 +
.../org/apache/iceberg/BaseReplacePartitions.java | 2 +-
.../org/apache/iceberg/ManifestFilterManager.java | 426 +++++++++++++++
.../org/apache/iceberg/ManifestMergeManager.java | 192 +++++++
.../apache/iceberg/MergingSnapshotProducer.java | 595 ++++-----------------
.../java/org/apache/iceberg/SnapshotSummary.java | 12 +
.../java/org/apache/iceberg/util/Exceptions.java | 15 +
.../java/org/apache/iceberg/TableTestBase.java | 35 +-
.../iceberg/TestSequenceNumberForV2Table.java | 12 +-
10 files changed, 797 insertions(+), 504 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
index 4a5598e..638eb25 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
@@ -41,7 +42,7 @@ import static org.apache.iceberg.expressions.Expressions.rewriteNot;
* example, if a file's ts column has min X and max Y, this evaluator will return true for ts < Y+1
* but not for ts < Y-1.
* <p>
- * Files are passed to {@link #eval(DataFile)}, which returns true if all rows in the file must
+ * Files are passed to {@link #eval(ContentFile)}, which returns true if all rows in the file must
* contain matching rows and false if the file may contain rows that do not match.
*/
public class StrictMetricsEvaluator {
@@ -69,7 +70,7 @@ public class StrictMetricsEvaluator {
* @param file a data file
* @return false if the file cannot contain rows that match the expression, true otherwise.
*/
- public boolean eval(DataFile file) {
+ public boolean eval(ContentFile<?> file) {
// TODO: detect the case where a column is missing from the file using file's max field id.
return visitor().eval(file);
}
@@ -83,7 +84,7 @@ public class StrictMetricsEvaluator {
private Map<Integer, ByteBuffer> lowerBounds = null;
private Map<Integer, ByteBuffer> upperBounds = null;
- private boolean eval(DataFile file) {
+ private boolean eval(ContentFile<?> file) {
if (file.recordCount() <= 0) {
return ROWS_MUST_MATCH;
}
diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java b/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java
index a724682..96d39f8 100644
--- a/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java
+++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -32,6 +33,10 @@ public class CharSequenceSet implements Set<CharSequence>, Serializable {
return new CharSequenceSet(charSequences);
}
+ public static Set<CharSequence> empty() {
+ return new CharSequenceSet(ImmutableList.of());
+ }
+
private final Set<CharSequenceWrapper> wrapperSet;
private final CharSequenceWrapper containsWrapper = CharSequenceWrapper.wrap(null);
diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
index 6b6fb55..f991dcf 100644
--- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
+++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
@@ -61,7 +61,7 @@ public class BaseReplacePartitions
try {
return super.apply(base);
- } catch (DeleteException e) {
+ } catch (ManifestFilterManager.DeleteException e) {
throw new ValidationException(
"Cannot commit file that conflicts with existing partition: %s", e.partition());
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
new file mode 100644
index 0000000..7f0896a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.StrictMetricsEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.CharSequenceWrapper;
+import org.apache.iceberg.util.ManifestFileUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class ManifestFilterManager<F extends ContentFile<F>> {
+ private static final Logger LOG = LoggerFactory.getLogger(ManifestFilterManager.class);
+ private static final Joiner COMMA = Joiner.on(",");
+
+ protected static class DeleteException extends ValidationException {
+ private final String partition;
+
+ private DeleteException(String partition) {
+ super("Operation would delete existing data");
+ this.partition = partition;
+ }
+
+ public String partition() {
+ return partition;
+ }
+ }
+
+ private final Set<CharSequence> deletePaths = CharSequenceSet.empty();
+ private final Set<StructLikeWrapper> deleteFilePartitions = Sets.newHashSet();
+ private final Set<StructLikeWrapper> dropPartitions = Sets.newHashSet();
+ private Expression deleteExpression = Expressions.alwaysFalse();
+ private boolean hasPathOnlyDeletes = false;
+ private boolean failAnyDelete = false;
+ private boolean failMissingDeletePaths = false;
+ private int duplicateDeleteCount = 0;
+
+ // cache filtered manifests to avoid extra work when commits fail.
+ private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
+
+ // tracking where files were deleted to validate retries quickly
+ private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
+ Maps.newConcurrentMap();
+
+ protected abstract PartitionSpec spec(int specId);
+ protected abstract void deleteFile(String location);
+ protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
+ protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);
+
+ protected void failAnyDelete() {
+ this.failAnyDelete = true;
+ }
+
+ protected void failMissingDeletePaths() {
+ this.failMissingDeletePaths = true;
+ }
+
+ /**
+ * Add a filter to match files to delete. A file will be deleted if all of the rows it contains
+ * match this or any other filter passed to this method.
+ *
+ * @param expr an expression to match rows.
+ */
+ protected void deleteByRowFilter(Expression expr) {
+ Preconditions.checkNotNull(expr, "Cannot delete files using filter: null");
+ invalidateFilteredCache();
+ this.deleteExpression = Expressions.or(deleteExpression, expr);
+ }
+
+ /**
+ * Add a partition tuple to drop from the table during the delete phase.
+ */
+ protected void dropPartition(StructLike partition) {
+ Preconditions.checkNotNull(partition, "Cannot delete files in invalid partition: null");
+ invalidateFilteredCache();
+ dropPartitions.add(StructLikeWrapper.wrap(partition));
+ }
+
+ /**
+ * Add a specific path to be deleted in the new snapshot.
+ */
+ void delete(F file) {
+ Preconditions.checkNotNull(file, "Cannot delete file: null");
+ invalidateFilteredCache();
+ deletePaths.add(file.path());
+ deleteFilePartitions.add(StructLikeWrapper.wrap(file.partition()));
+ }
+
+ /**
+ * Add a specific path to be deleted in the new snapshot.
+ */
+ void delete(CharSequence path) {
+ Preconditions.checkNotNull(path, "Cannot delete file path: null");
+ invalidateFilteredCache();
+ this.hasPathOnlyDeletes = true;
+ deletePaths.add(path);
+ }
+
+ /**
+ * Filter deleted files out of a list of manifests.
+ *
+ * @param tableSchema the current table schema
+ * @param manifests a list of manifests to be filtered
+ * @return an array of filtered manifests
+ */
+ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manifests) {
+ if (manifests == null || manifests.isEmpty()) {
+ validateRequiredDeletes();
+ return ImmutableList.of();
+ }
+
+ // use a common metrics evaluator for all manifests because it is bound to the table schema
+ StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(tableSchema, deleteExpression);
+
+ ManifestFile[] filtered = new ManifestFile[manifests.size()];
+ // open all of the manifest files in parallel, use index to avoid reordering
+ Tasks.range(filtered.length)
+ .stopOnFailure().throwFailureWhenFinished()
+ .executeWith(ThreadPools.getWorkerPool())
+ .run(index -> {
+ ManifestFile manifest = filterManifest(metricsEvaluator, manifests.get(index));
+ filtered[index] = manifest;
+ });
+
+ validateRequiredDeletes(filtered);
+
+ return Arrays.asList(filtered);
+ }
+
+ /**
+ * Creates a snapshot summary builder with the files deleted from the set of filtered manifests.
+ *
+ * @param manifests a set of filtered manifests
+ */
+ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
+ SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
+
+ for (ManifestFile manifest : manifests) {
+ PartitionSpec manifestSpec = spec(manifest.partitionSpecId());
+ Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+ if (manifestDeletes != null) {
+ for (F file : manifestDeletes) {
+ summaryBuilder.deletedFile(manifestSpec, file);
+ }
+ }
+ }
+
+ summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount);
+
+ return summaryBuilder;
+ }
+
+ /**
+ * Throws a {@link ValidationException} if any deleted file was not present in a filtered manifest.
+ *
+ * @param manifests a set of filtered manifests
+ */
+ private void validateRequiredDeletes(ManifestFile... manifests) {
+ if (failMissingDeletePaths) {
+ Set<CharSequence> deletedFiles = deletedFiles(manifests);
+ ValidationException.check(deletedFiles.containsAll(deletePaths),
+ "Missing required files to delete: %s",
+ COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
+ }
+ }
+
+ private Set<CharSequence> deletedFiles(ManifestFile[] manifests) {
+ Set<CharSequence> deletedFiles = CharSequenceSet.empty();
+
+ if (manifests != null) {
+ for (ManifestFile manifest : manifests) {
+ Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+ if (manifestDeletes != null) {
+ for (F file : manifestDeletes) {
+ deletedFiles.add(file.path());
+ }
+ }
+ }
+ }
+
+ return deletedFiles;
+ }
+
+ /**
+ * Deletes filtered manifests that were created by this class, but are not in the committed manifest set.
+ *
+ * @param committed the set of manifest files that were committed
+ */
+ void cleanUncommitted(Set<ManifestFile> committed) {
+ // iterate over a copy of entries to avoid concurrent modification
+ List<Map.Entry<ManifestFile, ManifestFile>> filterEntries =
+ Lists.newArrayList(filteredManifests.entrySet());
+
+ for (Map.Entry<ManifestFile, ManifestFile> entry : filterEntries) {
+ // remove any new filtered manifests that aren't in the committed list
+ ManifestFile manifest = entry.getKey();
+ ManifestFile filtered = entry.getValue();
+ if (!committed.contains(filtered)) {
+ // only delete if the filtered copy was created
+ if (!manifest.equals(filtered)) {
+ deleteFile(filtered.path());
+ }
+
+ // remove the entry from the cache
+ filteredManifests.remove(manifest);
+ }
+ }
+ }
+
+ private void invalidateFilteredCache() {
+ cleanUncommitted(SnapshotProducer.EMPTY_SET);
+ }
+
+ /**
+ * @return a ManifestReader that is a filtered version of the input manifest.
+ */
+ private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest) {
+ ManifestFile cached = filteredManifests.get(manifest);
+ if (cached != null) {
+ return cached;
+ }
+
+ boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
+ if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
+ filteredManifests.put(manifest, manifest);
+ return manifest;
+ }
+
+ try (ManifestReader<F> reader = newManifestReader(manifest)) {
+
+ // this is reused to compare file paths with the delete set
+ CharSequenceWrapper pathWrapper = CharSequenceWrapper.wrap("");
+
+ // reused to compare file partitions with the drop set
+ StructLikeWrapper partitionWrapper = StructLikeWrapper.wrap(null);
+
+ // this assumes that the manifest doesn't have files to remove and streams through the
+ // manifest without copying data. if a manifest does have a file to remove, this will break
+ // out of the loop and move on to filtering the manifest.
+ boolean hasDeletedFiles =
+ manifestHasDeletedFiles(metricsEvaluator, reader, pathWrapper, partitionWrapper);
+
+ if (!hasDeletedFiles) {
+ filteredManifests.put(manifest, manifest);
+ return manifest;
+ }
+
+ return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader, pathWrapper, partitionWrapper);
+
+ } catch (IOException e) {
+ throw new RuntimeIOException("Failed to close manifest: " + manifest, e);
+ }
+ }
+
+ private boolean canContainDeletedFiles(ManifestFile manifest) {
+ boolean canContainExpressionDeletes;
+ if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
+ ManifestEvaluator manifestEvaluator =
+ ManifestEvaluator.forRowFilter(deleteExpression, spec(manifest.partitionSpecId()), true);
+ canContainExpressionDeletes = manifestEvaluator.eval(manifest);
+ } else {
+ canContainExpressionDeletes = false;
+ }
+
+ boolean canContainDroppedPartitions;
+ if (dropPartitions.size() > 0) {
+ canContainDroppedPartitions = ManifestFileUtil.canContainAny(
+ manifest,
+ Iterables.transform(dropPartitions, StructLikeWrapper::get),
+ this::spec);
+ } else {
+ canContainDroppedPartitions = false;
+ }
+
+ boolean canContainDroppedFiles;
+ if (hasPathOnlyDeletes) {
+ canContainDroppedFiles = true;
+ } else if (deletePaths.size() > 0) {
+ // because there were no path-only deletes, the set of deleted file partitions is valid
+ canContainDroppedFiles = ManifestFileUtil.canContainAny(
+ manifest,
+ Iterables.transform(deleteFilePartitions, StructLikeWrapper::get),
+ this::spec);
+ } else {
+ canContainDroppedFiles = false;
+ }
+
+ return canContainExpressionDeletes || canContainDroppedPartitions || canContainDroppedFiles;
+ }
+
+ private boolean manifestHasDeletedFiles(
+ StrictMetricsEvaluator metricsEvaluator, ManifestReader<F> reader,
+ CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) {
+ Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
+ Evaluator strict = strictDeleteEvaluator(reader.spec());
+ boolean hasDeletedFiles = false;
+ for (ManifestEntry<F> entry : reader.entries()) {
+ F file = entry.file();
+ boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) ||
+ dropPartitions.contains(partitionWrapper.set(file.partition()));
+ if (fileDelete || inclusive.eval(file.partition())) {
+ ValidationException.check(
+ fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
+ "Cannot delete file where some, but not all, rows match filter %s: %s",
+ this.deleteExpression, file.path());
+
+ hasDeletedFiles = true;
+ if (failAnyDelete) {
+ throw new DeleteException(reader.spec().partitionToPath(file.partition()));
+ }
+ break; // as soon as a deleted file is detected, stop scanning
+ }
+ }
+ return hasDeletedFiles;
+ }
+
+ private ManifestFile filterManifestWithDeletedFiles(
+ StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader<F> reader,
+ CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) {
+ Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
+ Evaluator strict = strictDeleteEvaluator(reader.spec());
+ // when this point is reached, there is at least one file that will be deleted in the
+ // manifest. produce a copy of the manifest with all deleted files removed.
+ List<F> deletedFiles = Lists.newArrayList();
+ Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
+
+ try {
+ ManifestWriter<F> writer = newManifestWriter(reader.spec());
+ try {
+ reader.entries().forEach(entry -> {
+ F file = entry.file();
+ boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) ||
+ dropPartitions.contains(partitionWrapper.set(file.partition()));
+ if (entry.status() != ManifestEntry.Status.DELETED) {
+ if (fileDelete || inclusive.eval(file.partition())) {
+ ValidationException.check(
+ fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
+ "Cannot delete file where some, but not all, rows match filter %s: %s",
+ this.deleteExpression, file.path());
+
+ writer.delete(entry);
+
+ CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
+ if (deletedPaths.contains(wrapper)) {
+ LOG.warn("Deleting a duplicate path from manifest {}: {}",
+ manifest.path(), wrapper.get());
+ duplicateDeleteCount += 1;
+ } else {
+ // only add the file to deletes if it is a new delete
+ // this keeps the snapshot summary accurate for non-duplicate data
+ deletedFiles.add(entry.file().copyWithoutStats());
+ }
+ deletedPaths.add(wrapper);
+
+ } else {
+ writer.existing(entry);
+ }
+ }
+ });
+ } finally {
+ writer.close();
+ }
+
+ // return the filtered manifest as a reader
+ ManifestFile filtered = writer.toManifestFile();
+
+ // update caches
+ filteredManifests.put(manifest, filtered);
+ filteredManifestToDeletedFiles.put(filtered, deletedFiles);
+
+ return filtered;
+
+ } catch (IOException e) {
+ throw new RuntimeIOException("Failed to close manifest writer", e);
+ }
+ }
+
+ private Evaluator strictDeleteEvaluator(PartitionSpec spec) {
+ Expression strictExpr = Projections.strict(spec).project(deleteExpression);
+ return new Evaluator(spec.partitionType(), strictExpr);
+ }
+
+ private Evaluator inclusiveDeleteEvaluator(PartitionSpec spec) {
+ Expression inclusiveExpr = Projections.inclusive(spec).project(deleteExpression);
+ return new Evaluator(spec.partitionType(), inclusiveExpr);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
new file mode 100644
index 0000000..2dc8148
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.util.BinPacking.ListPacker;
+import org.apache.iceberg.util.Exceptions;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+
+abstract class ManifestMergeManager<F extends ContentFile<F>> {
+ private final long targetSizeBytes;
+ private final int minCountToMerge;
+ private final boolean mergeEnabled;
+
+ // cache merge results to reuse when retrying
+ private final Map<List<ManifestFile>, ManifestFile> mergedManifests = Maps.newConcurrentMap();
+
+ ManifestMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) {
+ this.targetSizeBytes = targetSizeBytes;
+ this.minCountToMerge = minCountToMerge;
+ this.mergeEnabled = mergeEnabled;
+ }
+
+ protected abstract long snapshotId();
+ protected abstract PartitionSpec spec(int specId);
+ protected abstract void deleteFile(String location);
+ protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
+ protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);
+
+ Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
+ Iterator<ManifestFile> manifestIter = manifests.iterator();
+ if (!mergeEnabled || !manifestIter.hasNext()) {
+ return manifests;
+ }
+
+ ManifestFile first = manifestIter.next();
+
+ List<ManifestFile> merged = Lists.newArrayList();
+ ListMultimap<Integer, ManifestFile> groups = groupBySpec(first, manifestIter);
+ for (Integer specId : groups.keySet()) {
+ Iterables.addAll(merged, mergeGroup(first, specId, groups.get(specId)));
+ }
+
+ return merged;
+ }
+
+ void cleanUncommitted(Set<ManifestFile> committed) {
+ // iterate over a copy of entries to avoid concurrent modification
+ List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
+ Lists.newArrayList(mergedManifests.entrySet());
+
+ for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
+ // delete any new merged manifests that aren't in the committed list
+ ManifestFile merged = entry.getValue();
+ if (!committed.contains(merged)) {
+ deleteFile(merged.path());
+ // remove the deleted file from the cache
+ mergedManifests.remove(entry.getKey());
+ }
+ }
+ }
+
+ private ListMultimap<Integer, ManifestFile> groupBySpec(ManifestFile first, Iterator<ManifestFile> remaining) {
+ ListMultimap<Integer, ManifestFile> groups = Multimaps.newListMultimap(
+ Maps.newTreeMap(Comparator.<Integer>reverseOrder()),
+ Lists::newArrayList);
+ groups.put(first.partitionSpecId(), first);
+ remaining.forEachRemaining(manifest -> groups.put(manifest.partitionSpecId(), manifest));
+ return groups;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Iterable<ManifestFile> mergeGroup(ManifestFile first, int specId, List<ManifestFile> group) {
+ // use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
+ // from the end so that the manifest that gets under-filled is the first one, which will be
+ // merged the next time.
+ ListPacker<ManifestFile> packer = new ListPacker<>(targetSizeBytes, 1, false);
+ List<List<ManifestFile>> bins = packer.packEnd(group, ManifestFile::length);
+
+ // process bins in parallel, but put results in the order of the bins into an array to preserve
+ // the order of manifests and contents. preserving the order helps avoid random deletes when
+ // data files are eventually aged off.
+ List<ManifestFile>[] binResults = (List<ManifestFile>[])
+ Array.newInstance(List.class, bins.size());
+
+ Tasks.range(bins.size())
+ .stopOnFailure().throwFailureWhenFinished()
+ .executeWith(ThreadPools.getWorkerPool())
+ .run(index -> {
+ List<ManifestFile> bin = bins.get(index);
+ List<ManifestFile> outputManifests = Lists.newArrayList();
+ binResults[index] = outputManifests;
+
+ if (bin.size() == 1) {
+ // no need to rewrite
+ outputManifests.add(bin.get(0));
+ return;
+ }
+
+ // if the bin has the first manifest (the new data files or an appended manifest file) then only merge it
+ // if the number of manifests is above the minimum count. this is applied only to bins with an in-memory
+ // manifest so that large manifests don't prevent merging older groups.
+ if (bin.contains(first) && bin.size() < minCountToMerge) {
+ // not enough to merge, add all manifest files to the output list
+ outputManifests.addAll(bin);
+ } else {
+ // merge the group
+ outputManifests.add(createManifest(specId, bin));
+ }
+ });
+
+ return Iterables.concat(binResults);
+ }
+
+ private ManifestFile createManifest(int specId, List<ManifestFile> bin) {
+ // if this merge was already rewritten, use the existing file.
+ // if the new files are in this merge, then the ManifestFile for the new files has changed and
+ // will be a cache miss.
+ if (mergedManifests.containsKey(bin)) {
+ return mergedManifests.get(bin);
+ }
+
+ ManifestWriter<F> writer = newManifestWriter(spec(specId));
+ boolean threw = true;
+ try {
+ for (ManifestFile manifest : bin) {
+ try (ManifestReader<F> reader = newManifestReader(manifest)) {
+ for (ManifestEntry<F> entry : reader.entries()) {
+ if (entry.status() == Status.DELETED) {
+ // suppress deletes from previous snapshots. only files deleted by this snapshot
+ // should be added to the new manifest
+ if (entry.snapshotId() == snapshotId()) {
+ writer.delete(entry);
+ }
+ } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
+ // adds from this snapshot are still adds, otherwise they should be existing
+ writer.add(entry);
+ } else {
+ // add all files from the old manifest as existing files
+ writer.existing(entry);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException("Failed to close manifest reader", e);
+ }
+ }
+ threw = false;
+
+ } finally {
+ Exceptions.close(writer, threw);
+ }
+
+ ManifestFile manifest = writer.toManifestFile();
+
+ // update the cache
+ mergedManifests.put(bin, manifest);
+
+ return manifest;
+
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 3076df3..057b45f 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -20,39 +20,18 @@
package org.apache.iceberg;
import java.io.IOException;
-import java.lang.reflect.Array;
-import java.util.Arrays;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.expressions.Projections;
-import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.util.BinPacking.ListPacker;
-import org.apache.iceberg.util.CharSequenceWrapper;
-import org.apache.iceberg.util.ManifestFileUtil;
-import org.apache.iceberg.util.StructLikeWrapper;
-import org.apache.iceberg.util.Tasks;
-import org.apache.iceberg.util.ThreadPools;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT;
@@ -62,73 +41,91 @@ import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;
abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
- private static final Logger LOG = LoggerFactory.getLogger(MergingSnapshotProducer.class);
+ private final String tableName;
+ private final TableOperations ops;
+ private final PartitionSpec spec;
+ private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
+ private final ManifestMergeManager<DataFile> mergeManager;
+ private final ManifestFilterManager<DataFile> filterManager;
+ private final boolean snapshotIdInheritanceEnabled;
- private static final Joiner COMMA = Joiner.on(",");
+ private class DataFileFilterManager extends ManifestFilterManager<DataFile> {
+ @Override
+ protected PartitionSpec spec(int specId) {
+ return ops.current().spec(specId);
+ }
- protected static class DeleteException extends ValidationException {
- private final String partition;
+ @Override
+ protected void deleteFile(String location) {
+ MergingSnapshotProducer.this.deleteFile(location);
+ }
- private DeleteException(String partition) {
- super("Operation would delete existing data");
- this.partition = partition;
+ @Override
+ protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) {
+ return MergingSnapshotProducer.this.newManifestWriter(manifestSpec);
}
- public String partition() {
- return partition;
+ @Override
+ protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
+ return ManifestFiles.read(manifest, ops.io(), ops.current().specsById());
}
}
- private final String tableName;
- private final TableOperations ops;
- private final PartitionSpec spec;
- private final long manifestTargetSizeBytes;
- private final int minManifestsCountToMerge;
- private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
- private final boolean mergeEnabled;
- private final boolean snapshotIdInheritanceEnabled;
+ private class DataFileMergeManager extends ManifestMergeManager<DataFile> {
+ DataFileMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) {
+ super(targetSizeBytes, minCountToMerge, mergeEnabled);
+ }
+
+ @Override
+ protected long snapshotId() {
+ return MergingSnapshotProducer.this.snapshotId();
+ }
+
+ @Override
+ protected PartitionSpec spec(int specId) {
+ return ops.current().spec(specId);
+ }
+
+ @Override
+ protected void deleteFile(String location) {
+ MergingSnapshotProducer.this.deleteFile(location);
+ }
+
+ @Override
+ protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) {
+ return MergingSnapshotProducer.this.newManifestWriter(manifestSpec);
+ }
+
+ @Override
+ protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
+ return ManifestFiles.read(manifest, ops.io(), ops.current().specsById());
+ }
+ }
// update data
private final List<DataFile> newFiles = Lists.newArrayList();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder();
- private final Set<CharSequenceWrapper> deletePaths = Sets.newHashSet();
- private final Set<StructLikeWrapper> deleteFilePartitions = Sets.newHashSet();
- private final Set<StructLikeWrapper> dropPartitions = Sets.newHashSet();
private Expression deleteExpression = Expressions.alwaysFalse();
- private boolean hasPathOnlyDeletes = false;
- private boolean failAnyDelete = false;
- private boolean failMissingDeletePaths = false;
// cache the new manifest once it is written
private ManifestFile cachedNewManifest = null;
- private ManifestFile firstAppendedManifest = null;
private boolean hasNewFiles = false;
- // cache merge results to reuse when retrying
- private final Map<List<ManifestFile>, ManifestFile> mergeManifests = Maps.newConcurrentMap();
-
- // cache filtered manifests to avoid extra work when commits fail.
- private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
-
- // tracking where files were deleted to validate retries quickly
- private final Map<ManifestFile, Iterable<DataFile>> filteredManifestToDeletedFiles =
- Maps.newConcurrentMap();
-
- private boolean filterUpdated = false; // used to clear caches of filtered and merged manifests
-
MergingSnapshotProducer(String tableName, TableOperations ops) {
super(ops);
this.tableName = tableName;
this.ops = ops;
this.spec = ops.current().spec();
- this.manifestTargetSizeBytes = ops.current()
+ long targetSizeBytes = ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
- this.minManifestsCountToMerge = ops.current()
+ int minCountToMerge = ops.current()
.propertyAsInt(MANIFEST_MIN_MERGE_COUNT, MANIFEST_MIN_MERGE_COUNT_DEFAULT);
- this.mergeEnabled = ops.current()
+ boolean mergeEnabled = ops.current()
.propertyAsBoolean(TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT);
+ this.mergeManager = new DataFileMergeManager(targetSizeBytes, minCountToMerge, mergeEnabled);
+ this.filterManager = new DataFileFilterManager();
this.snapshotIdInheritanceEnabled = ops.current()
.propertyAsBoolean(SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
}
@@ -153,11 +150,11 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
}
protected void failAnyDelete() {
- this.failAnyDelete = true;
+ filterManager.failAnyDelete();
}
protected void failMissingDeletePaths() {
- this.failMissingDeletePaths = true;
+ filterManager.failMissingDeletePaths();
}
/**
@@ -167,36 +164,29 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
* @param expr an expression to match rows.
*/
protected void deleteByRowFilter(Expression expr) {
- Preconditions.checkNotNull(expr, "Cannot delete files using filter: null");
- this.filterUpdated = true;
- this.deleteExpression = Expressions.or(deleteExpression, expr);
+ this.deleteExpression = expr;
+ filterManager.deleteByRowFilter(expr);
}
/**
* Add a partition tuple to drop from the table during the delete phase.
*/
protected void dropPartition(StructLike partition) {
- dropPartitions.add(StructLikeWrapper.wrap(partition));
+ filterManager.dropPartition(partition);
}
/**
* Add a specific path to be deleted in the new snapshot.
*/
protected void delete(DataFile file) {
- Preconditions.checkNotNull(file, "Cannot delete file: null");
- this.filterUpdated = true;
- deletePaths.add(CharSequenceWrapper.wrap(file.path()));
- deleteFilePartitions.add(StructLikeWrapper.wrap(file.partition()));
+ filterManager.delete(file);
}
/**
* Add a specific path to be deleted in the new snapshot.
*/
protected void delete(CharSequence path) {
- Preconditions.checkNotNull(path, "Cannot delete file path: null");
- this.filterUpdated = true;
- this.hasPathOnlyDeletes = true;
- deletePaths.add(CharSequenceWrapper.wrap(path));
+ filterManager.delete(path);
}
/**
@@ -211,22 +201,13 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
* Add all files in a manifest to the new snapshot.
*/
protected void add(ManifestFile manifest) {
- ManifestFile appendedManifest;
if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
appendedManifestsSummary.addedManifest(manifest);
appendManifests.add(manifest);
- appendedManifest = manifest;
} else {
// the manifest must be rewritten with this update's snapshot ID
ManifestFile copiedManifest = copyManifest(manifest);
rewrittenAppendManifests.add(copiedManifest);
- appendedManifest = copiedManifest;
- }
-
- // keep reference of the first appended manifest, so that we can avoid merging first bin(s)
- // which has the first appended manifest and have not crossed the limit of minManifestsCountToMerge
- if (firstAppendedManifest == null) {
- firstAppendedManifest = appendedManifest;
}
}
@@ -247,79 +228,29 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
public List<ManifestFile> apply(TableMetadata base) {
summaryBuilder.clear();
summaryBuilder.merge(appendedManifestsSummary);
-
- if (filterUpdated) {
- cleanUncommittedFilters(SnapshotProducer.EMPTY_SET);
- this.filterUpdated = false;
- }
+ Iterable<ManifestFile> newManifests = prepareNewManifests();
Snapshot current = base.currentSnapshot();
- Map<Integer, List<ManifestFile>> groups = Maps.newTreeMap(Comparator.<Integer>reverseOrder());
-
- // use a common metrics evaluator for all manifests because it is bound to the table schema
- StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(
- ops.current().schema(), deleteExpression);
-
- // add the current spec as the first group. files are added to the beginning.
- try {
- Iterable<ManifestFile> newManifests;
- if (newFiles.size() > 0) {
- // add all of the new files to the summary builder
- for (DataFile file : newFiles) {
- summaryBuilder.addedFile(spec, file);
- }
-
- ManifestFile newManifest = newFilesAsManifest();
- newManifests = Iterables.concat(ImmutableList.of(newManifest), appendManifests, rewrittenAppendManifests);
- } else {
- newManifests = Iterables.concat(appendManifests, rewrittenAppendManifests);
- }
- Iterable<ManifestFile> newManifestsWithMetadata = Iterables.transform(
- newManifests,
- manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
-
- // filter any existing manifests
- List<ManifestFile> filtered;
- if (current != null) {
- List<ManifestFile> manifests = current.dataManifests();
- filtered = Arrays.asList(filterManifests(metricsEvaluator, manifests));
- } else {
- filtered = ImmutableList.of();
- }
+ // filter any existing manifests
+ List<ManifestFile> filtered = filterManager.filterManifests(
+ base.schema(), current != null ? current.dataManifests() : null);
- Iterable<ManifestFile> unmergedManifests = Iterables.filter(
- Iterables.concat(newManifestsWithMetadata, filtered),
- // only keep manifests that have live data files or that were written by this commit
- manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.snapshotId() == snapshotId());
+ Iterable<ManifestFile> unmergedManifests = Iterables.filter(
+ Iterables.concat(newManifests, filtered),
+ // only keep manifests that have live data files or that were written by this commit
+ manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.snapshotId() == snapshotId());
- Set<CharSequenceWrapper> deletedFiles = deletedFiles(unmergedManifests);
+ summaryBuilder.merge(filterManager.buildSummary(unmergedManifests));
- List<ManifestFile> manifests = Lists.newArrayList();
- if (mergeEnabled) {
- groupManifestsByPartitionSpec(groups, unmergedManifests);
- for (Map.Entry<Integer, List<ManifestFile>> entry : groups.entrySet()) {
- Iterables.addAll(manifests, mergeGroup(entry.getKey(), entry.getValue()));
- }
- } else {
- Iterables.addAll(manifests, unmergedManifests);
- }
+ List<ManifestFile> manifests = Lists.newArrayList();
+ Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
- if (current != null) {
- manifests.addAll(current.deleteManifests());
- }
-
- ValidationException.check(!failMissingDeletePaths || deletedFiles.containsAll(deletePaths),
- "Missing required files to delete: %s",
- COMMA.join(Iterables.transform(Iterables.filter(deletePaths,
- path -> !deletedFiles.contains(path)),
- CharSequenceWrapper::get)));
-
- return manifests;
-
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to create snapshot manifest list");
+ if (current != null) {
+ manifests.addAll(current.deleteManifests());
}
+
+ return manifests;
}
@Override
@@ -334,87 +265,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
summary());
}
- private ManifestFile[] filterManifests(StrictMetricsEvaluator metricsEvaluator, List<ManifestFile> manifests)
- throws IOException {
- ManifestFile[] filtered = new ManifestFile[manifests.size()];
- // open all of the manifest files in parallel, use index to avoid reordering
- Tasks.range(filtered.length)
- .stopOnFailure().throwFailureWhenFinished()
- .executeWith(ThreadPools.getWorkerPool())
- .run(index -> {
- ManifestFile manifest = filterManifest(metricsEvaluator, manifests.get(index));
- filtered[index] = manifest;
- }, IOException.class);
- return filtered;
- }
-
- private Set<CharSequenceWrapper> deletedFiles(Iterable<ManifestFile> manifests) {
- Set<CharSequenceWrapper> deletedFiles = Sets.newHashSet();
-
- for (ManifestFile manifest : manifests) {
- PartitionSpec manifestSpec = ops.current().spec(manifest.partitionSpecId());
- Iterable<DataFile> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
- if (manifestDeletes != null) {
- for (DataFile file : manifestDeletes) {
- summaryBuilder.deletedFile(manifestSpec, file);
- deletedFiles.add(CharSequenceWrapper.wrap(file.path()));
- }
- }
- }
-
- return deletedFiles;
- }
-
- private void groupManifestsByPartitionSpec(Map<Integer, List<ManifestFile>> groups, Iterable<ManifestFile> filtered) {
- for (ManifestFile manifest : filtered) {
- List<ManifestFile> group = groups.get(manifest.partitionSpecId());
- if (group != null) {
- group.add(manifest);
- } else {
- group = Lists.newArrayList();
- group.add(manifest);
- groups.put(manifest.partitionSpecId(), group);
- }
- }
- }
-
- private void cleanUncommittedMerges(Set<ManifestFile> committed) {
- // iterate over a copy of entries to avoid concurrent modification
- List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
- Lists.newArrayList(mergeManifests.entrySet());
-
- for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
- // delete any new merged manifests that aren't in the committed list
- ManifestFile merged = entry.getValue();
- if (!committed.contains(merged)) {
- deleteFile(merged.path());
- // remove the deleted file from the cache
- mergeManifests.remove(entry.getKey());
- }
- }
- }
-
- private void cleanUncommittedFilters(Set<ManifestFile> committed) {
- // iterate over a copy of entries to avoid concurrent modification
- List<Map.Entry<ManifestFile, ManifestFile>> filterEntries =
- Lists.newArrayList(filteredManifests.entrySet());
-
- for (Map.Entry<ManifestFile, ManifestFile> entry : filterEntries) {
- // remove any new filtered manifests that aren't in the committed list
- ManifestFile manifest = entry.getKey();
- ManifestFile filtered = entry.getValue();
- if (!committed.contains(filtered)) {
- // only delete if the filtered copy was created
- if (!manifest.equals(filtered)) {
- deleteFile(filtered.path());
- }
-
- // remove the entry from the cache
- filteredManifests.remove(manifest);
- }
- }
- }
-
private void cleanUncommittedAppends(Set<ManifestFile> committed) {
if (cachedNewManifest != null && !committed.contains(cachedNewManifest)) {
deleteFile(cachedNewManifest.path());
@@ -442,283 +292,50 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
- cleanUncommittedMerges(committed);
- cleanUncommittedFilters(committed);
+ mergeManager.cleanUncommitted(committed);
+ filterManager.cleanUncommitted(committed);
cleanUncommittedAppends(committed);
}
- private boolean canContainDeletedFiles(ManifestFile manifest) {
- boolean canContainExpressionDeletes;
- if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
- ManifestEvaluator manifestEvaluator =
- ManifestEvaluator.forRowFilter(deleteExpression, ops.current().spec(), true);
- canContainExpressionDeletes = manifestEvaluator.eval(manifest);
- } else {
- canContainExpressionDeletes = false;
- }
-
- boolean canContainDroppedPartitions;
- if (dropPartitions.size() > 0) {
- canContainDroppedPartitions = ManifestFileUtil.canContainAny(
- manifest,
- Iterables.transform(dropPartitions, StructLikeWrapper::get),
- specId -> ops.current().spec(specId));
- } else {
- canContainDroppedPartitions = false;
- }
-
- boolean canContainDroppedFiles;
- if (hasPathOnlyDeletes) {
- canContainDroppedFiles = true;
- } else if (deletePaths.size() > 0) {
- // because there were no path-only deletes, the set of deleted file partitions is valid
- canContainDroppedFiles = ManifestFileUtil.canContainAny(
- manifest,
- Iterables.transform(deleteFilePartitions, StructLikeWrapper::get),
- specId -> ops.current().spec(specId));
- } else {
- canContainDroppedFiles = false;
- }
-
- return canContainExpressionDeletes || canContainDroppedPartitions || canContainDroppedFiles;
- }
-
- /**
- * @return a ManifestReader that is a filtered version of the input manifest.
- */
- private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator,
- ManifestFile manifest) throws IOException {
- ManifestFile cached = filteredManifests.get(manifest);
- if (cached != null) {
- return cached;
- }
-
- boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
- if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
- filteredManifests.put(manifest, manifest);
- return manifest;
- }
-
- try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
-
- // this is reused to compare file paths with the delete set
- CharSequenceWrapper pathWrapper = CharSequenceWrapper.wrap("");
-
- // reused to compare file partitions with the drop set
- StructLikeWrapper partitionWrapper = StructLikeWrapper.wrap(null);
-
- // this assumes that the manifest doesn't have files to remove and streams through the
- // manifest without copying data. if a manifest does have a file to remove, this will break
- // out of the loop and move on to filtering the manifest.
- boolean hasDeletedFiles =
- manifestHasDeletedFiles(metricsEvaluator, reader, pathWrapper, partitionWrapper);
-
- if (!hasDeletedFiles) {
- filteredManifests.put(manifest, manifest);
- return manifest;
+ private Iterable<ManifestFile> prepareNewManifests() {
+ Iterable<ManifestFile> newManifests;
+ if (newFiles.size() > 0) {
+ // add all of the new files to the summary builder
+ for (DataFile file : newFiles) {
+ summaryBuilder.addedFile(spec, file);
}
- return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader, pathWrapper,
- partitionWrapper);
- }
- }
-
- private boolean manifestHasDeletedFiles(
- StrictMetricsEvaluator metricsEvaluator, ManifestReader<DataFile> reader,
- CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) {
- Evaluator inclusive = extractInclusiveDeleteExpression(reader);
- Evaluator strict = extractStrictDeleteExpression(reader);
- boolean hasDeletedFiles = false;
- for (ManifestEntry<DataFile> entry : reader.entries()) {
- DataFile file = entry.file();
- boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) ||
- dropPartitions.contains(partitionWrapper.set(file.partition()));
- if (fileDelete || inclusive.eval(file.partition())) {
- ValidationException.check(
- fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
- "Cannot delete file where some, but not all, rows match filter %s: %s",
- this.deleteExpression, file.path());
-
- hasDeletedFiles = true;
- if (failAnyDelete) {
- throw new DeleteException(writeSpec().partitionToPath(file.partition()));
- }
- break; // as soon as a deleted file is detected, stop scanning
- }
- }
- return hasDeletedFiles;
- }
-
- private ManifestFile filterManifestWithDeletedFiles(
- StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader<DataFile> reader,
- CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) throws IOException {
- Evaluator inclusive = extractInclusiveDeleteExpression(reader);
- Evaluator strict = extractStrictDeleteExpression(reader);
- // when this point is reached, there is at least one file that will be deleted in the
- // manifest. produce a copy of the manifest with all deleted files removed.
- List<DataFile> deletedFiles = Lists.newArrayList();
- Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
- ManifestWriter<DataFile> writer = newManifestWriter(reader.spec());
- try {
- reader.entries().forEach(entry -> {
- DataFile file = entry.file();
- boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) ||
- dropPartitions.contains(partitionWrapper.set(file.partition()));
- if (entry.status() != Status.DELETED) {
- if (fileDelete || inclusive.eval(file.partition())) {
- ValidationException.check(
- fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
- "Cannot delete file where some, but not all, rows match filter %s: %s",
- this.deleteExpression, file.path());
-
- writer.delete(entry);
-
- CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
- if (deletedPaths.contains(wrapper)) {
- LOG.warn("Deleting a duplicate path from manifest {}: {}",
- manifest.path(), wrapper.get());
- summaryBuilder.incrementDuplicateDeletes();
- } else {
- // only add the file to deletes if it is a new delete
- // this keeps the snapshot summary accurate for non-duplicate data
- deletedFiles.add(entry.file().copyWithoutStats());
- }
- deletedPaths.add(wrapper);
-
- } else {
- writer.existing(entry);
- }
- }
- });
- } finally {
- writer.close();
- }
-
- // return the filtered manifest as a reader
- ManifestFile filtered = writer.toManifestFile();
-
- // update caches
- filteredManifests.put(manifest, filtered);
- filteredManifestToDeletedFiles.put(filtered, deletedFiles);
-
- return filtered;
- }
-
- private Evaluator extractStrictDeleteExpression(ManifestReader reader) {
- Expression strictExpr = Projections
- .strict(reader.spec())
- .project(deleteExpression);
- return new Evaluator(reader.spec().partitionType(), strictExpr);
- }
-
- private Evaluator extractInclusiveDeleteExpression(ManifestReader reader) {
- Expression inclusiveExpr = Projections
- .inclusive(reader.spec())
- .project(deleteExpression);
- return new Evaluator(reader.spec().partitionType(), inclusiveExpr);
- }
-
- @SuppressWarnings("unchecked")
- private Iterable<ManifestFile> mergeGroup(int specId, List<ManifestFile> group)
- throws IOException {
- // use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
- // from the end so that the manifest that gets under-filled is the first one, which will be
- // merged the next time.
- ListPacker<ManifestFile> packer = new ListPacker<>(manifestTargetSizeBytes, 1, false);
- List<List<ManifestFile>> bins = packer.packEnd(group, manifest -> manifest.length());
-
- // process bins in parallel, but put results in the order of the bins into an array to preserve
- // the order of manifests and contents. preserving the order helps avoid random deletes when
- // data files are eventually aged off.
- List<ManifestFile>[] binResults = (List<ManifestFile>[])
- Array.newInstance(List.class, bins.size());
- Tasks.range(bins.size())
- .stopOnFailure().throwFailureWhenFinished()
- .executeWith(ThreadPools.getWorkerPool())
- .run(index -> {
- List<ManifestFile> bin = bins.get(index);
- List<ManifestFile> outputManifests = Lists.newArrayList();
- binResults[index] = outputManifests;
-
- if (bin.size() == 1) {
- // no need to rewrite
- outputManifests.add(bin.get(0));
- return;
- }
-
- // if the bin has a new manifest (the new data files) or appended manifest file then only merge it
- // if the number of manifests is above the minimum count. this is applied only to bins with an in-memory
- // manifest so that large manifests don't prevent merging older groups.
- if ((bin.contains(cachedNewManifest) || bin.contains(firstAppendedManifest)) &&
- bin.size() < minManifestsCountToMerge) {
- // not enough to merge, add all manifest files to the output list
- outputManifests.addAll(bin);
- } else {
- // merge the group
- outputManifests.add(createManifest(specId, bin));
- }
- }, IOException.class);
-
- return Iterables.concat(binResults);
- }
-
- private ManifestFile createManifest(int specId, List<ManifestFile> bin) throws IOException {
- // if this merge was already rewritten, use the existing file.
- // if the new files are in this merge, then the ManifestFile for the new files has changed and
- // will be a cache miss.
- if (mergeManifests.containsKey(bin)) {
- return mergeManifests.get(bin);
- }
-
- ManifestWriter<DataFile> writer = newManifestWriter(ops.current().spec(specId));
- try {
- for (ManifestFile manifest : bin) {
- try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
- for (ManifestEntry<DataFile> entry : reader.entries()) {
- if (entry.status() == Status.DELETED) {
- // suppress deletes from previous snapshots. only files deleted by this snapshot
- // should be added to the new manifest
- if (entry.snapshotId() == snapshotId()) {
- writer.delete(entry);
- }
- } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
- // adds from this snapshot are still adds, otherwise they should be existing
- writer.add(entry);
- } else {
- // add all files from the old manifest as existing files
- writer.existing(entry);
- }
- }
- }
- }
- } finally {
- writer.close();
+ ManifestFile newManifest = newFilesAsManifest();
+ newManifests = Iterables.concat(ImmutableList.of(newManifest), appendManifests, rewrittenAppendManifests);
+ } else {
+ newManifests = Iterables.concat(appendManifests, rewrittenAppendManifests);
}
- ManifestFile manifest = writer.toManifestFile();
-
- // update the cache
- mergeManifests.put(bin, manifest);
-
- return manifest;
+ return Iterables.transform(
+ newManifests,
+ manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
}
- private ManifestFile newFilesAsManifest() throws IOException {
+ private ManifestFile newFilesAsManifest() {
if (hasNewFiles && cachedNewManifest != null) {
deleteFile(cachedNewManifest.path());
cachedNewManifest = null;
}
if (cachedNewManifest == null) {
- ManifestWriter writer = newManifestWriter(spec);
try {
- writer.addAll(newFiles);
- } finally {
- writer.close();
- }
+ ManifestWriter<DataFile> writer = newManifestWriter(spec);
+ try {
+ writer.addAll(newFiles);
+ } finally {
+ writer.close();
+ }
- this.cachedNewManifest = writer.toManifestFile();
- this.hasNewFiles = false;
+ this.cachedNewManifest = writer.toManifestFile();
+ this.hasNewFiles = false;
+ } catch (IOException e) {
+ throw new RuntimeIOException("Failed to close manifest writer", e);
+ }
}
return cachedNewManifest;
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
index 8101d5c..21038d0 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
@@ -68,6 +68,18 @@ public class SnapshotSummary {
this.deletedDuplicateFiles += 1;
}
+ public void incrementDuplicateDeletes(int increment) {
+ this.deletedDuplicateFiles += increment;
+ }
+
+ public void deletedFile(PartitionSpec spec, ContentFile<?> file) {
+ if (file instanceof DataFile) {
+ deletedFile(spec, (DataFile) file);
+ } else {
+ throw new IllegalArgumentException("Unsupported file type: " + file.getClass().getSimpleName());
+ }
+ }
+
public void deletedFile(PartitionSpec spec, DataFile file) {
changedPartitions.add(spec.partitionToPath(file.partition()));
this.deletedFiles += 1;
diff --git a/core/src/main/java/org/apache/iceberg/util/Exceptions.java b/core/src/main/java/org/apache/iceberg/util/Exceptions.java
index 775b3eb..0594345 100644
--- a/core/src/main/java/org/apache/iceberg/util/Exceptions.java
+++ b/core/src/main/java/org/apache/iceberg/util/Exceptions.java
@@ -19,10 +19,25 @@
package org.apache.iceberg.util;
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+
public class Exceptions {
private Exceptions() {
}
+ public static void close(Closeable closeable, boolean suppressExceptions) {
+ try {
+ closeable.close();
+ } catch (IOException e) {
+ if (!suppressExceptions) {
+ throw new RuntimeIOException("Failed calling close", e);
+ }
+ // otherwise, ignore the exception
+ }
+ }
+
public static <E extends Exception> E suppressExceptions(E alreadyThrown, Runnable run) {
try {
run.run();
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 8ef9732..4e9f6a0 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -160,23 +160,32 @@ public class TableTestBase {
return writer.toManifestFile();
}
- ManifestFile writeManifest(String fileName, ManifestEntry<DataFile>... entries) throws IOException {
+ ManifestFile writeManifest(String fileName, ManifestEntry<?>... entries) throws IOException {
return writeManifest(null, fileName, entries);
}
- ManifestFile writeManifest(Long snapshotId, ManifestEntry<DataFile>... entries) throws IOException {
+ ManifestFile writeManifest(Long snapshotId, ManifestEntry<?>... entries) throws IOException {
return writeManifest(snapshotId, "input.m0.avro", entries);
}
- ManifestFile writeManifest(Long snapshotId, String fileName, ManifestEntry<DataFile>... entries) throws IOException {
+ @SuppressWarnings("unchecked")
+ <F extends ContentFile<F>> ManifestFile writeManifest(Long snapshotId, String fileName, ManifestEntry<?>... entries)
+ throws IOException {
File manifestFile = temp.newFile(fileName);
Assert.assertTrue(manifestFile.delete());
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
- ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
+ ManifestWriter<F> writer;
+ if (entries[0].file() instanceof DataFile) {
+ writer = (ManifestWriter<F>) ManifestFiles.write(
+ formatVersion, table.spec(), outputFile, snapshotId);
+ } else {
+ writer = (ManifestWriter<F>) ManifestFiles.writeDeleteManifest(
+ formatVersion, table.spec(), outputFile, snapshotId);
+ }
try {
- for (ManifestEntry<DataFile> entry : entries) {
- writer.addEntry(entry);
+ for (ManifestEntry<?> entry : entries) {
+ writer.addEntry((ManifestEntry<F>) entry);
}
} finally {
writer.close();
@@ -280,13 +289,21 @@ public class TableTestBase {
void validateManifest(ManifestFile manifest,
Iterator<Long> ids,
Iterator<DataFile> expectedFiles) {
- validateManifest(manifest, null, ids, expectedFiles);
+ validateManifest(manifest, null, ids, expectedFiles, null);
}
void validateManifest(ManifestFile manifest,
Iterator<Long> seqs,
Iterator<Long> ids,
Iterator<DataFile> expectedFiles) {
+ validateManifest(manifest, seqs, ids, expectedFiles, null);
+ }
+
+ void validateManifest(ManifestFile manifest,
+ Iterator<Long> seqs,
+ Iterator<Long> ids,
+ Iterator<DataFile> expectedFiles,
+ Iterator<ManifestEntry.Status> statuses) {
for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
DataFile file = entry.file();
DataFile expected = expectedFiles.next();
@@ -298,6 +315,10 @@ public class TableTestBase {
expected.path().toString(), file.path().toString());
Assert.assertEquals("Snapshot ID should match expected ID",
ids.next(), entry.snapshotId());
+ if (statuses != null) {
+ Assert.assertEquals("Status should match expected",
+ statuses.next(), entry.status());
+ }
}
Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext());
diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java
index 55cefcf..57912d3 100644
--- a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java
+++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.junit.Test;
@@ -79,8 +80,11 @@ public class TestSequenceNumberForV2Table extends TableTestBase {
manifestFile = snap4.allManifests().stream()
.filter(manifest -> manifest.snapshotId() == commitId4)
.collect(Collectors.toList()).get(0);
- validateManifest(manifestFile, seqs(4, 3, 2, 1), ids(commitId4, commitId3, commitId2, commitId1),
- files(FILE_D, FILE_C, FILE_B, FILE_A));
+ validateManifest(manifestFile,
+ seqs(4, 3, 2, 1),
+ ids(commitId4, commitId3, commitId2, commitId1),
+ files(FILE_D, FILE_C, FILE_B, FILE_A),
+ statuses(Status.ADDED, Status.EXISTING, Status.EXISTING, Status.EXISTING));
V2Assert.assertEquals("Snapshot sequence number should be 4", 4, snap4.sequenceNumber());
V2Assert.assertEquals("Last sequence number should be 4", 4, readMetadata().lastSequenceNumber());
}
@@ -115,7 +119,7 @@ public class TestSequenceNumberForV2Table extends TableTestBase {
V2Assert.assertEquals("Last sequence number should be 3", 3, readMetadata().lastSequenceNumber());
// FILE_A and FILE_B in manifest may reorder
- for (ManifestEntry entry : ManifestFiles.read(newManifest, FILE_IO).entries()) {
+ for (ManifestEntry<DataFile> entry : ManifestFiles.read(newManifest, FILE_IO).entries()) {
if (entry.file().path().equals(FILE_A.path())) {
V2Assert.assertEquals("FILE_A sequence number should be 1", 1, entry.sequenceNumber().longValue());
}
@@ -272,7 +276,7 @@ public class TestSequenceNumberForV2Table extends TableTestBase {
manifestFile = table.currentSnapshot().allManifests().stream()
.filter(manifest -> manifest.snapshotId() == commitId4)
.collect(Collectors.toList()).get(0);
- validateManifest(manifestFile, seqs(3, 2, 4), ids(commitId3, commitId2, commitId4), files(FILE_C, FILE_B, FILE_A));
+ validateManifest(manifestFile, seqs(4), ids(commitId4), files(FILE_A), statuses(Status.DELETED));
V2Assert.assertEquals("Snapshot sequence number should be 4", 4, snap4.sequenceNumber());
V2Assert.assertEquals("Last sequence number should be 4", 4, readMetadata().lastSequenceNumber());
}