You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/16 21:08:09 UTC
[incubator-iceberg] branch master updated: Do not merge appended
manifests under min count (#524)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 83ee9f4 Do not merge appended manifests under min count (#524)
83ee9f4 is described below
commit 83ee9f4f568cc19cf2d5b1f7c0f8133d1a44f5ab
Author: manishmalhotrawork <ma...@gmail.com>
AuthorDate: Sat Nov 16 13:08:03 2019 -0800
Do not merge appended manifests under min count (#524)
---
.../apache/iceberg/MergingSnapshotProducer.java | 18 +++++--
.../java/org/apache/iceberg/TableTestBase.java | 17 ++++++
.../java/org/apache/iceberg/TestMergeAppend.java | 62 ++++++++++++++++++++++
3 files changed, 92 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index f674033..bb6df4a 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -98,6 +98,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
// cache the new manifest once it is written
private ManifestFile cachedNewManifest = null;
+ private ManifestFile firstAppendedManifest = null;
private boolean hasNewFiles = false;
// cache merge results to reuse when retrying
@@ -205,8 +206,14 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
// the manifest must be rewritten with this update's snapshot ID
try (ManifestReader reader = ManifestReader.read(
ops.io().newInputFile(manifest.path()), ops.current().specsById())) {
- appendManifests.add(ManifestWriter.copyAppendManifest(
- reader, manifestPath(manifestCount.getAndIncrement()), snapshotId(), appendedManifestsSummary));
+ ManifestFile manifestFile = ManifestWriter.copyAppendManifest(
+ reader, manifestPath(manifestCount.getAndIncrement()), snapshotId(), appendedManifestsSummary);
+ appendManifests.add(manifestFile);
+ // keep reference of the first appended manifest, so that we can avoid merging first bin(s)
+ // which has the first appended manifest and have not crossed the limit of minManifestsCountToMerge
+ if (firstAppendedManifest == null) {
+ firstAppendedManifest = manifestFile;
+ }
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}
@@ -589,10 +596,11 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
return;
}
- // if the bin has a new manifest (the new data files) then only merge it if the number of
- // manifests is above the minimum count. this is applied only to bins with an in-memory
+ // if the bin has a new manifest (the new data files) or appended manifest file then only merge it
+ // if the number of manifests is above the minimum count. this is applied only to bins with an in-memory
// manifest so that large manifests don't prevent merging older groups.
- if (bin.contains(cachedNewManifest) && bin.size() < minManifestsCountToMerge) {
+ if ((bin.contains(cachedNewManifest) || bin.contains(firstAppendedManifest)) &&
+ bin.size() < minManifestsCountToMerge) {
// not enough to merge, add all manifest files to the output list
outputManifests.addAll(bin);
} else {
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 9f98c21..59c7cc7 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -157,6 +157,23 @@ public class TableTestBase {
return writer.toManifestFile();
}
+ ManifestFile writeManifestWithName(String name, DataFile... files) throws IOException {
+ File manifestFile = temp.newFile(name + ".avro");
+ Assert.assertTrue(manifestFile.delete());
+ OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
+
+ ManifestWriter writer = ManifestWriter.write(table.spec(), outputFile);
+ try {
+ for (DataFile file : files) {
+ writer.add(file);
+ }
+ } finally {
+ writer.close();
+ }
+
+ return writer.toManifestFile();
+ }
+
ManifestEntry manifestEntry(ManifestEntry.Status status, long snapshotId, DataFile file) {
ManifestEntry entry = new ManifestEntry(table.spec().partitionType());
switch (status) {
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 32a9137..01d2b7f 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -157,6 +157,68 @@ public class TestMergeAppend extends TableTestBase {
}
@Test
+ public void testManifestMergeMinCount() throws IOException {
+ Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+ table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2")
+ // each manifest file is 4554 bytes, so 10000 bytes limit will give us 2 bins with 3 manifest/data files.
+ .set(TableProperties.MANIFEST_TARGET_SIZE_BYTES, "10000")
+ .commit();
+
+ TableMetadata base = readMetadata();
+ Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+ ManifestFile manifest = writeManifest(FILE_A);
+ ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C);
+ ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D);
+ table.newAppend()
+ .appendManifest(manifest)
+ .appendManifest(manifest2)
+ .appendManifest(manifest3)
+ .commit();
+
+ Assert.assertEquals("Should contain 2 merged manifest for first write",
+ 2, readMetadata().currentSnapshot().manifests().size());
+
+ table.newAppend()
+ .appendManifest(manifest)
+ .appendManifest(manifest2)
+ .appendManifest(manifest3)
+ .commit();
+
+ Assert.assertEquals("Should contain 3 merged manifest for second write",
+ 3, readMetadata().currentSnapshot().manifests().size());
+
+ // validate that the metadata summary is correct when using appendManifest
+ Assert.assertEquals("Summary metadata should include 3 added files",
+ "3", readMetadata().currentSnapshot().summary().get("added-data-files"));
+ }
+
+ @Test
+ public void testManifestDoNotMergeMinCount() throws IOException {
+ Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+ table.updateProperties().set("commit.manifest.min-count-to-merge", "4").commit();
+
+ TableMetadata base = readMetadata();
+ Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+ ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+ ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C);
+ ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D);
+ Snapshot pending = table.newAppend()
+ .appendManifest(manifest)
+ .appendManifest(manifest2)
+ .appendManifest(manifest3)
+ .apply();
+
+ Assert.assertEquals("Should contain 3 merged manifest after 1st write write",
+ 3, pending.manifests().size());
+
+ // validate that the metadata summary is correct when using appendManifest
+ Assert.assertEquals("Summary metadata should include 4 added files",
+ "4", pending.summary().get("added-data-files"));
+ }
+
+ @Test
public void testMergeWithExistingManifestAfterDelete() {
// merge all manifests for this test
table.updateProperties().set("commit.manifest.min-count-to-merge", "1").commit();