You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/12 18:01:52 UTC
[incubator-iceberg] branch master updated: Extend RewriteManifests
to delete/add manifests directly (#512)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 70c1940 Extend RewriteManifests to delete/add manifests directly (#512)
70c1940 is described below
commit 70c1940209b9c444363322bf547b65d9320e0cea
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Nov 12 20:01:44 2019 +0200
Extend RewriteManifests to delete/add manifests directly (#512)
---
.../java/org/apache/iceberg/RewriteManifests.java | 24 +-
.../org/apache/iceberg/BaseRewriteManifests.java | 152 ++++++--
.../java/org/apache/iceberg/ManifestWriter.java | 29 +-
.../java/org/apache/iceberg/TableTestBase.java | 31 ++
.../org/apache/iceberg/TestRewriteManifests.java | 414 +++++++++++++++++++++
5 files changed, 612 insertions(+), 38 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/RewriteManifests.java b/api/src/main/java/org/apache/iceberg/RewriteManifests.java
index 1ed1ddf..74032eb 100644
--- a/api/src/main/java/org/apache/iceberg/RewriteManifests.java
+++ b/api/src/main/java/org/apache/iceberg/RewriteManifests.java
@@ -29,6 +29,10 @@ import java.util.function.Predicate;
* described only by the manifest files that were added, and commits that snapshot as the
* current.
* <p>
+ * This API can be used to rewrite matching manifests according to a clustering function as well as
+ * to replace specific manifests. Manifests that are deleted or added directly are ignored during
+ * the rewrite process. The set of active files in replaced manifests must be the same as in new manifests.
+ * <p>
* When committing, these changes will be applied to the latest table snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
*/
@@ -37,7 +41,8 @@ public interface RewriteManifests extends SnapshotUpdate<RewriteManifests> {
* Groups an existing {@link DataFile} by a cluster key produced by a function. The cluster key
* will determine which data file will be associated with a particular manifest. All data files
* with the same cluster key will be written to the same manifest (unless the file is large and
- * split into multiple files).
+ * split into multiple files). Manifests deleted via {@link #deleteManifest(ManifestFile)} or
+ * added via {@link #addManifest(ManifestFile)} are ignored during the rewrite process.
*
* @param func Function used to cluster data files to manifests.
* @return this for method chaining
@@ -54,4 +59,21 @@ public interface RewriteManifests extends SnapshotUpdate<RewriteManifests> {
* @return this for method chaining
*/
RewriteManifests rewriteIf(Predicate<ManifestFile> predicate);
+
+ /**
+ * Deletes a {@link ManifestFile manifest file} from the table.
+ *
+ * @param manifest a manifest to delete
+ * @return this for method chaining
+ */
+ RewriteManifests deleteManifest(ManifestFile manifest);
+
+ /**
+ * Adds a {@link ManifestFile manifest file} to the table. The added manifest cannot contain new
+ * or deleted files.
+ *
+ * @param manifest a manifest to add
+ * @return this for method chaining
+ */
+ RewriteManifests addManifest(ManifestFile manifest);
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index c90d8d6..4a084b9 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -20,6 +20,8 @@
package org.apache.iceberg;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
@@ -34,7 +36,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
@@ -43,27 +48,33 @@ import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFA
public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> implements RewriteManifests {
+ private static final String KEPT_MANIFESTS_COUNT = "manifests-kept";
+ private static final String CREATED_MANIFESTS_COUNT = "manifests-created";
+ private static final String REPLACED_MANIFESTS_COUNT = "manifests-replaced";
+ private static final String PROCESSED_ENTRY_COUNT = "entries-processed";
+
+ private static final Set<ManifestEntry.Status> ALLOWED_ENTRY_STATUSES = ImmutableSet.of(
+ ManifestEntry.Status.EXISTING);
+
private final TableOperations ops;
private final PartitionSpec spec;
private final long manifestTargetSizeBytes;
+ private final Set<ManifestFile> deletedManifests = Sets.newHashSet();
+ private final List<ManifestFile> addedManifests = Lists.newArrayList();
+
private final List<ManifestFile> keptManifests = Collections.synchronizedList(new ArrayList<>());
private final List<ManifestFile> newManifests = Collections.synchronizedList(new ArrayList<>());
- private final Set<ManifestFile> replacedManifests = Collections.synchronizedSet(new HashSet<>());
+ private final Set<ManifestFile> rewrittenManifests = Collections.synchronizedSet(new HashSet<>());
private final Map<Object, WriterWrapper> writers = Collections.synchronizedMap(new HashMap<>());
private final AtomicInteger manifestSuffix = new AtomicInteger(0);
private final AtomicLong entryCount = new AtomicLong(0);
- private final Map<String, String> summaryProps = new HashMap<>();
-
private Function<DataFile, Object> clusterByFunc;
private Predicate<ManifestFile> predicate;
- private static final String REPLACED_CNT = "manifests-replaced";
- private static final String KEPT_CNT = "manifests-kept";
- private static final String NEW_CNT = "manifests-created";
- private static final String ENTRY_CNT = "entries-processed";
+ private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
BaseRewriteManifests(TableOperations ops) {
super(ops);
@@ -85,19 +96,17 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
@Override
public RewriteManifests set(String property, String value) {
- summaryProps.put(property, value);
+ summaryBuilder.set(property, value);
return this;
}
@Override
protected Map<String, String> summary() {
- Map<String, String> result = new HashMap<>();
- result.putAll(summaryProps);
- result.put(KEPT_CNT, Integer.toString(keptManifests.size()));
- result.put(NEW_CNT, Integer.toString(newManifests.size()));
- result.put(REPLACED_CNT, Integer.toString(replacedManifests.size()));
- result.put(ENTRY_CNT, Long.toString(entryCount.get()));
- return result;
+ summaryBuilder.set(KEPT_MANIFESTS_COUNT, String.valueOf(keptManifests.size()));
+ summaryBuilder.set(CREATED_MANIFESTS_COUNT, String.valueOf(newManifests.size() + addedManifests.size()));
+ summaryBuilder.set(REPLACED_MANIFESTS_COUNT, String.valueOf(rewrittenManifests.size() + deletedManifests.size()));
+ summaryBuilder.set(PROCESSED_ENTRY_COUNT, String.valueOf(entryCount.get()));
+ return summaryBuilder.build();
}
@Override
@@ -113,50 +122,84 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
}
@Override
- public List<ManifestFile> apply(TableMetadata base) {
- Preconditions.checkNotNull(clusterByFunc, "clusterBy function cannot be null");
+ public RewriteManifests deleteManifest(ManifestFile manifest) {
+ deletedManifests.add(manifest);
+ return this;
+ }
+ @Override
+ public RewriteManifests addManifest(ManifestFile manifest) {
+ try {
+ // the appended manifest must be rewritten with this update's snapshot ID
+ addedManifests.add(copyManifest(manifest));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Cannot append manifest: " + e.getMessage());
+ }
+ return this;
+ }
+
+ private ManifestFile copyManifest(ManifestFile manifest) {
+ Map<Integer, PartitionSpec> specsById = ops.current().specsById();
+ try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()), specsById)) {
+ OutputFile newFile = manifestPath(manifestSuffix.getAndIncrement());
+ return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
+ }
+ }
+
+ @Override
+ public List<ManifestFile> apply(TableMetadata base) {
List<ManifestFile> currentManifests = base.currentSnapshot().manifests();
+ Set<ManifestFile> currentManifestSet = ImmutableSet.copyOf(currentManifests);
+
+ validateDeletedManifests(currentManifestSet);
- if (requiresRewrite(currentManifests)) {
- // run the rewrite process
+ if (requiresRewrite(currentManifestSet)) {
performRewrite(currentManifests);
} else {
- // just keep any new manifests that were added since the last apply(), don't rerun
- addExistingFromNewCommit(currentManifests);
+ keepActiveManifests(currentManifests);
}
+ validateFilesCounts();
+
// put new manifests at the beginning
List<ManifestFile> apply = new ArrayList<>();
apply.addAll(newManifests);
+ apply.addAll(addedManifests);
apply.addAll(keptManifests);
return apply;
}
- private boolean requiresRewrite(List<ManifestFile> currentManifests) {
- if (replacedManifests.size() == 0) {
+ private boolean requiresRewrite(Set<ManifestFile> currentManifests) {
+ if (clusterByFunc == null) {
+ // manifests are deleted and added directly so don't perform a rewrite
+ return false;
+ }
+
+ if (rewrittenManifests.size() == 0) {
// nothing yet processed so perform a full rewrite
return true;
}
+
// if any processed manifest is not in the current manifest list, perform a full rewrite
- Set<ManifestFile> set = Sets.newHashSet(currentManifests);
- return replacedManifests.stream().anyMatch(manifest -> !set.contains(manifest));
+ return rewrittenManifests.stream().anyMatch(manifest -> !currentManifests.contains(manifest));
}
- private void addExistingFromNewCommit(List<ManifestFile> currentManifests) {
+ private void keepActiveManifests(List<ManifestFile> currentManifests) {
// keep any existing manifests as-is that were not processed
keptManifests.clear();
currentManifests.stream()
- .filter(manifest -> !replacedManifests.contains(manifest))
+ .filter(manifest -> !rewrittenManifests.contains(manifest) && !deletedManifests.contains(manifest))
.forEach(manifest -> keptManifests.add(manifest));
}
private void reset() {
- cleanAll();
+ cleanUncommitted(newManifests, ImmutableSet.of());
entryCount.set(0);
keptManifests.clear();
- replacedManifests.clear();
+ rewrittenManifests.clear();
newManifests.clear();
writers.clear();
}
@@ -164,14 +207,18 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
private void performRewrite(List<ManifestFile> currentManifests) {
reset();
+ List<ManifestFile> remainingManifests = currentManifests.stream()
+ .filter(manifest -> !deletedManifests.contains(manifest))
+ .collect(Collectors.toList());
+
try {
- Tasks.foreach(currentManifests)
+ Tasks.foreach(remainingManifests)
.executeWith(ThreadPools.getWorkerPool())
.run(manifest -> {
if (predicate != null && !predicate.test(manifest)) {
keptManifests.add(manifest);
} else {
- replacedManifests.add(manifest);
+ rewrittenManifests.add(manifest);
try (ManifestReader reader =
ManifestReader.read(ops.io().newInputFile(manifest.path()), ops.current().specsById())) {
FilteredManifest filteredManifest = reader.select(Arrays.asList("*"));
@@ -189,6 +236,40 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
}
}
+ private void validateDeletedManifests(Set<ManifestFile> currentManifests) {
+ // directly deleted manifests must be still present in the current snapshot
+ deletedManifests.stream()
+ .filter(manifest -> !currentManifests.contains(manifest))
+ .findAny()
+ .ifPresent(manifest -> {
+ throw new ValidationException("Manifest is missing: %s", manifest.path());
+ });
+ }
+
+ private void validateFilesCounts() {
+ int createdManifestsFilesCount = activeFilesCount(newManifests) + activeFilesCount(addedManifests);
+ int replacedManifestsFilesCount = activeFilesCount(rewrittenManifests) + activeFilesCount(deletedManifests);
+
+ if (createdManifestsFilesCount != replacedManifestsFilesCount) {
+ throw new ValidationException(
+ "Replaced and created manifests must have the same number of active files: %d (new), %d (old)",
+ createdManifestsFilesCount, replacedManifestsFilesCount);
+ }
+ }
+
+ private int activeFilesCount(Iterable<ManifestFile> manifests) {
+ int activeFilesCount = 0;
+
+ for (ManifestFile manifest : manifests) {
+ Preconditions.checkNotNull(manifest.addedFilesCount(), "Missing file counts in %s", manifest.path());
+ Preconditions.checkNotNull(manifest.existingFilesCount(), "Missing file counts in %s", manifest.path());
+ activeFilesCount += manifest.addedFilesCount();
+ activeFilesCount += manifest.existingFilesCount();
+ }
+
+ return activeFilesCount;
+ }
+
private void appendEntry(ManifestEntry entry, Object key) {
Preconditions.checkNotNull(entry, "Manifest entry cannot be null");
Preconditions.checkNotNull(key, "Key cannot be null");
@@ -214,8 +295,13 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
- for (ManifestFile manifest : newManifests) {
- if (!committed.contains(manifest)) {
+ cleanUncommitted(newManifests, committed);
+ cleanUncommitted(addedManifests, committed);
+ }
+
+ private void cleanUncommitted(Iterable<ManifestFile> manifests, Set<ManifestFile> committedManifests) {
+ for (ManifestFile manifest : manifests) {
+ if (!committedManifests.contains(manifest)) {
deleteFile(manifest.path());
}
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index eb53829..e7441e6 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -20,7 +20,9 @@
package org.apache.iceberg;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.io.IOException;
+import java.util.Set;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
@@ -36,14 +38,33 @@ public class ManifestWriter implements FileAppender<DataFile> {
static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
+ return copyManifest(reader, outputFile, snapshotId, summaryBuilder, Sets.newHashSet(ManifestEntry.Status.ADDED));
+ }
+
+ static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
+ SnapshotSummary.Builder summaryBuilder,
+ Set<ManifestEntry.Status> allowedEntryStatuses) {
ManifestWriter writer = new ManifestWriter(reader.spec(), outputFile, snapshotId);
boolean threw = true;
try {
for (ManifestEntry entry : reader.entries()) {
- Preconditions.checkArgument(entry.status() == ManifestEntry.Status.ADDED,
- "Cannot append manifest: contains existing files");
- summaryBuilder.addedFile(reader.spec(), entry.file());
- writer.add(entry);
+ Preconditions.checkArgument(
+ allowedEntryStatuses.contains(entry.status()),
+ "Invalid manifest entry status: %s (allowed statuses: %s)",
+ entry.status(), allowedEntryStatuses);
+ switch (entry.status()) {
+ case ADDED:
+ summaryBuilder.addedFile(reader.spec(), entry.file());
+ writer.add(entry);
+ break;
+ case EXISTING:
+ writer.existing(entry);
+ break;
+ case DELETED:
+ summaryBuilder.deletedFile(reader.spec(), entry.file());
+ writer.delete(entry);
+ break;
+ }
}
threw = false;
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 7857a99..9f98c21 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -140,6 +140,37 @@ public class TableTestBase {
return writer.toManifestFile();
}
+ ManifestFile writeManifest(String fileName, ManifestEntry... entries) throws IOException {
+ File manifestFile = temp.newFile(fileName);
+ Assert.assertTrue(manifestFile.delete());
+ OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
+
+ ManifestWriter writer = ManifestWriter.write(table.spec(), outputFile);
+ try {
+ for (ManifestEntry entry : entries) {
+ writer.addEntry(entry);
+ }
+ } finally {
+ writer.close();
+ }
+
+ return writer.toManifestFile();
+ }
+
+ ManifestEntry manifestEntry(ManifestEntry.Status status, long snapshotId, DataFile file) {
+ ManifestEntry entry = new ManifestEntry(table.spec().partitionType());
+ switch (status) {
+ case ADDED:
+ return entry.wrapAppend(snapshotId, file);
+ case EXISTING:
+ return entry.wrapExisting(snapshotId, file);
+ case DELETED:
+ return entry.wrapDelete(snapshotId, file);
+ default:
+ throw new IllegalArgumentException("Unexpected entry status: " + status);
+ }
+ }
+
void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) {
List<ManifestFile> oldManifests = old != null ? old.manifests() : ImmutableList.of();
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index 669722a..340bc09 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -19,15 +19,19 @@
package org.apache.iceberg;
+import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.iceberg.Files.localInput;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -329,4 +333,414 @@ public class TestRewriteManifests extends TableTestBase {
files(FILE_A),
statuses(ManifestEntry.Status.EXISTING));
}
+
+ @Test
+ public void testBasicManifestReplacement() throws IOException {
+ Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+ List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+ Assert.assertEquals(1, firstSnapshotManifests.size());
+ ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+ table.newFastAppend()
+ .appendFile(FILE_C)
+ .appendFile(FILE_D)
+ .commit();
+ Snapshot secondSnapshot = table.currentSnapshot();
+
+ ManifestFile firstNewManifest = writeManifest(
+ "manifest-file-1.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+ ManifestFile secondNewManifest = writeManifest(
+ "manifest-file-2.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
+
+ RewriteManifests rewriteManifests = table.rewriteManifests();
+ rewriteManifests.deleteManifest(firstSnapshotManifest);
+ rewriteManifests.addManifest(firstNewManifest);
+ rewriteManifests.addManifest(secondNewManifest);
+ rewriteManifests.commit();
+
+ Snapshot snapshot = table.currentSnapshot();
+ List<ManifestFile> manifests = snapshot.manifests();
+ Assert.assertEquals(3, manifests.size());
+
+ validateSummary(snapshot, 1, 1, 2, 0);
+
+ validateManifestEntries(
+ manifests.get(0),
+ ids(firstSnapshot.snapshotId()),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+
+ validateManifestEntries(
+ manifests.get(1),
+ ids(firstSnapshot.snapshotId()),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.EXISTING));
+
+ validateManifestEntries(
+ manifests.get(2),
+ ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()),
+ files(FILE_C, FILE_D),
+ statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+ }
+
+ @Test
+ public void testManifestReplacementConcurrentAppend() throws IOException {
+ Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+ List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+ Assert.assertEquals(1, firstSnapshotManifests.size());
+ ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+ ManifestFile firstNewManifest = writeManifest(
+ "manifest-file-1.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+ ManifestFile secondNewManifest = writeManifest(
+ "manifest-file-2.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
+
+ RewriteManifests rewriteManifests = table.rewriteManifests();
+ rewriteManifests.deleteManifest(firstSnapshotManifest);
+ rewriteManifests.addManifest(firstNewManifest);
+ rewriteManifests.addManifest(secondNewManifest);
+
+ table.newFastAppend()
+ .appendFile(FILE_C)
+ .appendFile(FILE_D)
+ .commit();
+ Snapshot secondSnapshot = table.currentSnapshot();
+
+ Assert.assertEquals(2, table.currentSnapshot().manifests().size());
+
+ rewriteManifests.commit();
+
+ Snapshot snapshot = table.currentSnapshot();
+ List<ManifestFile> manifests = snapshot.manifests();
+ Assert.assertEquals(3, manifests.size());
+
+ validateSummary(snapshot, 1, 1, 2, 0);
+
+ validateManifestEntries(
+ manifests.get(0),
+ ids(firstSnapshot.snapshotId()),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+
+ validateManifestEntries(
+ manifests.get(1),
+ ids(firstSnapshot.snapshotId()),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.EXISTING));
+
+ validateManifestEntries(
+ manifests.get(2),
+ ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()),
+ files(FILE_C, FILE_D),
+ statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED));
+ }
+
+ @Test
+ public void testManifestReplacementConcurrentDelete() throws IOException {
+ Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+ table.updateProperties()
+ .set(MANIFEST_MERGE_ENABLED, "false")
+ .commit();
+
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+ List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+ Assert.assertEquals(1, firstSnapshotManifests.size());
+ ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+ table.newFastAppend()
+ .appendFile(FILE_C)
+ .appendFile(FILE_D)
+ .commit();
+ long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+ ManifestFile firstNewManifest = writeManifest(
+ "manifest-file-1.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+ ManifestFile secondNewManifest = writeManifest(
+ "manifest-file-2.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
+
+ RewriteManifests rewriteManifests = table.rewriteManifests();
+ rewriteManifests.deleteManifest(firstSnapshotManifest);
+ rewriteManifests.addManifest(firstNewManifest);
+ rewriteManifests.addManifest(secondNewManifest);
+
+ table.newDelete()
+ .deleteFile(FILE_C)
+ .commit();
+ long thirdSnapshotId = table.currentSnapshot().snapshotId();
+
+ rewriteManifests.commit();
+
+ Snapshot snapshot = table.currentSnapshot();
+ List<ManifestFile> manifests = snapshot.manifests();
+ Assert.assertEquals(3, manifests.size());
+
+ validateSummary(snapshot, 1, 1, 2, 0);
+
+ validateManifestEntries(
+ manifests.get(0),
+ ids(firstSnapshot.snapshotId()),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+
+ validateManifestEntries(
+ manifests.get(1),
+ ids(firstSnapshot.snapshotId()),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.EXISTING));
+
+ validateManifestEntries(
+ manifests.get(2),
+ ids(thirdSnapshotId, secondSnapshotId),
+ files(FILE_C, FILE_D),
+ statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
+ }
+
+ @Test
+ public void testManifestReplacementConcurrentConflictingDelete() throws IOException {
+ Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+ List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+ Assert.assertEquals(1, firstSnapshotManifests.size());
+ ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+ ManifestFile firstNewManifest = writeManifest(
+ "manifest-file-1.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+ ManifestFile secondNewManifest = writeManifest(
+ "manifest-file-2.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B));
+
+ RewriteManifests rewriteManifests = table.rewriteManifests();
+ rewriteManifests.deleteManifest(firstSnapshotManifest);
+ rewriteManifests.addManifest(firstNewManifest);
+ rewriteManifests.addManifest(secondNewManifest);
+
+ table.newDelete()
+ .deleteFile(FILE_A)
+ .commit();
+
+ AssertHelpers.assertThrows("Should reject commit",
+ ValidationException.class, "Manifest is missing",
+ rewriteManifests::commit);
+ }
+
+ @Test
+ public void testManifestReplacementCombinedWithRewrite() throws IOException {
+ Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+ List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+ Assert.assertEquals(1, firstSnapshotManifests.size());
+ ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+ table.newFastAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ Snapshot secondSnapshot = table.currentSnapshot();
+
+ table.newFastAppend()
+ .appendFile(FILE_C)
+ .commit();
+
+ table.newFastAppend()
+ .appendFile(FILE_D)
+ .commit();
+
+ Assert.assertEquals(4, Iterables.size(table.snapshots()));
+
+ ManifestFile newManifest = writeManifest(
+ "manifest-file-1.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+
+ table.rewriteManifests()
+ .deleteManifest(firstSnapshotManifest)
+ .addManifest(newManifest)
+ .clusterBy(dataFile -> "const-value")
+ .rewriteIf(manifest -> {
+ try (ManifestReader reader = ManifestReader.read(localInput(manifest.path()))) {
+ return !reader.iterator().next().path().equals(FILE_B.path());
+ } catch (IOException x) {
+ throw new RuntimeIOException(x);
+ }
+ })
+ .commit();
+
+ Snapshot snapshot = table.currentSnapshot();
+ List<ManifestFile> manifests = snapshot.manifests();
+ Assert.assertEquals(3, manifests.size());
+
+ validateSummary(snapshot, 3, 1, 2, 2);
+
+ validateManifestEntries(
+ manifests.get(1),
+ ids(firstSnapshot.snapshotId()),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+
+ validateManifestEntries(
+ manifests.get(2),
+ ids(secondSnapshot.snapshotId()),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.ADDED));
+ }
+
+ @Test
+ public void testManifestReplacementCombinedWithRewriteConcurrentDelete() throws IOException {
+ Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+ table.updateProperties()
+ .set(MANIFEST_MERGE_ENABLED, "false")
+ .commit();
+
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ Snapshot firstSnapshot = table.currentSnapshot();
+ List<ManifestFile> firstSnapshotManifests = firstSnapshot.manifests();
+ Assert.assertEquals(1, firstSnapshotManifests.size());
+ ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0);
+
+ table.newFastAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ Snapshot secondSnapshot = table.currentSnapshot();
+
+ table.newFastAppend()
+ .appendFile(FILE_C)
+ .commit();
+
+ Assert.assertEquals(3, Iterables.size(table.snapshots()));
+
+ ManifestFile newManifest = writeManifest(
+ "manifest-file-1.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A));
+
+ RewriteManifests rewriteManifests = table.rewriteManifests()
+ .deleteManifest(firstSnapshotManifest)
+ .addManifest(newManifest)
+ .clusterBy(dataFile -> "const-value");
+
+ rewriteManifests.apply();
+
+ table.newDelete()
+ .deleteFile(FILE_C)
+ .commit();
+
+ rewriteManifests.commit();
+
+ Snapshot snapshot = table.currentSnapshot();
+ List<ManifestFile> manifests = snapshot.manifests();
+ Assert.assertEquals(2, manifests.size());
+
+ validateSummary(snapshot, 3, 0, 2, 1);
+
+ validateManifestEntries(
+ manifests.get(0),
+ ids(secondSnapshot.snapshotId()),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.EXISTING));
+
+ validateManifestEntries(
+ manifests.get(1),
+ ids(firstSnapshot.snapshotId()),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+ }
+
+ @Test
+ public void testInvalidUsage() throws IOException {
+ Assert.assertNull("Table should be empty", table.currentSnapshot());
+
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ Snapshot snapshot = table.currentSnapshot();
+ List<ManifestFile> manifests = snapshot.manifests();
+ Assert.assertEquals(1, manifests.size());
+ ManifestFile manifest = manifests.get(0);
+
+ ManifestFile invalidAddedFileManifest = writeManifest(
+ "manifest-file-2.avro",
+ manifestEntry(ManifestEntry.Status.ADDED, snapshot.snapshotId(), FILE_A));
+
+ AssertHelpers.assertThrows("Should reject commit",
+ IllegalArgumentException.class, "Cannot append manifest: Invalid manifest",
+ () -> table.rewriteManifests()
+ .deleteManifest(manifest)
+ .addManifest(invalidAddedFileManifest)
+ .commit());
+
+ ManifestFile invalidDeletedFileManifest = writeManifest(
+ "manifest-file-3.avro",
+ manifestEntry(ManifestEntry.Status.DELETED, snapshot.snapshotId(), FILE_A));
+
+ AssertHelpers.assertThrows("Should reject commit",
+ IllegalArgumentException.class, "Cannot append manifest: Invalid manifest",
+ () -> table.rewriteManifests()
+ .deleteManifest(manifest)
+ .addManifest(invalidDeletedFileManifest)
+ .commit());
+
+ AssertHelpers.assertThrows("Should reject commit",
+ ValidationException.class, "must have the same number of active files",
+ () -> table.rewriteManifests()
+ .deleteManifest(manifest)
+ .commit());
+ }
+
+ private void validateSummary(Snapshot snapshot, int replaced, int kept, int created, int entryCount) {
+ Map<String, String> summary = snapshot.summary();
+ Assert.assertEquals(
+ "Replaced manifest count should match",
+ replaced, Integer.parseInt(summary.get("manifests-replaced")));
+ Assert.assertEquals(
+ "Kept manifest count should match",
+ kept, Integer.parseInt(summary.get("manifests-kept")));
+ Assert.assertEquals(
+ "Created manifest count should match",
+ created, Integer.parseInt(summary.get("manifests-created")));
+ Assert.assertEquals(
+ "Entry count should match",
+ entryCount, Integer.parseInt(summary.get("entries-processed")));
+ }
}