You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/16 21:08:09 UTC

[incubator-iceberg] branch master updated: Do not merge appended manifests under min count (#524)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 83ee9f4  Do not merge appended manifests under min count (#524)
83ee9f4 is described below

commit 83ee9f4f568cc19cf2d5b1f7c0f8133d1a44f5ab
Author: manishmalhotrawork <ma...@gmail.com>
AuthorDate: Sat Nov 16 13:08:03 2019 -0800

    Do not merge appended manifests under min count (#524)
---
 .../apache/iceberg/MergingSnapshotProducer.java    | 18 +++++--
 .../java/org/apache/iceberg/TableTestBase.java     | 17 ++++++
 .../java/org/apache/iceberg/TestMergeAppend.java   | 62 ++++++++++++++++++++++
 3 files changed, 92 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index f674033..bb6df4a 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -98,6 +98,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
 
   // cache the new manifest once it is written
   private ManifestFile cachedNewManifest = null;
+  private ManifestFile firstAppendedManifest = null;
   private boolean hasNewFiles = false;
 
   // cache merge results to reuse when retrying
@@ -205,8 +206,14 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
     // the manifest must be rewritten with this update's snapshot ID
     try (ManifestReader reader = ManifestReader.read(
         ops.io().newInputFile(manifest.path()), ops.current().specsById())) {
-      appendManifests.add(ManifestWriter.copyAppendManifest(
-          reader, manifestPath(manifestCount.getAndIncrement()), snapshotId(), appendedManifestsSummary));
+      ManifestFile manifestFile = ManifestWriter.copyAppendManifest(
+          reader, manifestPath(manifestCount.getAndIncrement()), snapshotId(), appendedManifestsSummary);
+      appendManifests.add(manifestFile);
+      // keep reference of the first appended manifest, so that we can avoid merging first bin(s)
+      // which has the first appended manifest and have not crossed the limit of minManifestsCountToMerge
+      if (firstAppendedManifest == null) {
+        firstAppendedManifest = manifestFile;
+      }
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
     }
@@ -589,10 +596,11 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
             return;
           }
 
-          // if the bin has a new manifest (the new data files) then only merge it if the number of
-          // manifests is above the minimum count. this is applied only to bins with an in-memory
+          // if the bin has a new manifest (the new data files) or appended manifest file then only merge it
+          // if the number of manifests is above the minimum count. this is applied only to bins with an in-memory
           // manifest so that large manifests don't prevent merging older groups.
-          if (bin.contains(cachedNewManifest) && bin.size() < minManifestsCountToMerge) {
+          if ((bin.contains(cachedNewManifest) || bin.contains(firstAppendedManifest)) &&
+              bin.size() < minManifestsCountToMerge) {
             // not enough to merge, add all manifest files to the output list
             outputManifests.addAll(bin);
           } else {
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 9f98c21..59c7cc7 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -157,6 +157,23 @@ public class TableTestBase {
     return writer.toManifestFile();
   }
 
+  ManifestFile writeManifestWithName(String name, DataFile... files) throws IOException {
+    File manifestFile = temp.newFile(name + ".avro");
+    Assert.assertTrue(manifestFile.delete());
+    OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
+
+    ManifestWriter writer = ManifestWriter.write(table.spec(), outputFile);
+    try {
+      for (DataFile file : files) {
+        writer.add(file);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return writer.toManifestFile();
+  }
+
   ManifestEntry manifestEntry(ManifestEntry.Status status, long snapshotId, DataFile file) {
     ManifestEntry entry = new ManifestEntry(table.spec().partitionType());
     switch (status) {
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 32a9137..01d2b7f 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -157,6 +157,68 @@ public class TestMergeAppend extends TableTestBase {
   }
 
   @Test
+  public void testManifestMergeMinCount() throws IOException {
+    Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+    table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2")
+        // each manifest file is 4554 bytes, so 10000 bytes limit will give us 2 bins with 3 manifest/data files.
+        .set(TableProperties.MANIFEST_TARGET_SIZE_BYTES, "10000")
+        .commit();
+
+    TableMetadata base = readMetadata();
+    Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+    ManifestFile manifest = writeManifest(FILE_A);
+    ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C);
+    ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D);
+    table.newAppend()
+        .appendManifest(manifest)
+        .appendManifest(manifest2)
+        .appendManifest(manifest3)
+        .commit();
+
+    Assert.assertEquals("Should contain 2 merged manifest for first write",
+        2, readMetadata().currentSnapshot().manifests().size());
+
+    table.newAppend()
+        .appendManifest(manifest)
+        .appendManifest(manifest2)
+        .appendManifest(manifest3)
+        .commit();
+
+    Assert.assertEquals("Should contain 3 merged manifest for second write",
+        3, readMetadata().currentSnapshot().manifests().size());
+
+    // validate that the metadata summary is correct when using appendManifest
+    Assert.assertEquals("Summary metadata should include 3 added files",
+        "3", readMetadata().currentSnapshot().summary().get("added-data-files"));
+  }
+
+  @Test
+  public void testManifestDoNotMergeMinCount() throws IOException {
+    Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
+    table.updateProperties().set("commit.manifest.min-count-to-merge", "4").commit();
+
+    TableMetadata base = readMetadata();
+    Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
+
+    ManifestFile manifest = writeManifest(FILE_A, FILE_B);
+    ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C);
+    ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D);
+    Snapshot pending = table.newAppend()
+        .appendManifest(manifest)
+        .appendManifest(manifest2)
+        .appendManifest(manifest3)
+        .apply();
+
+    Assert.assertEquals("Should contain 3 merged manifest after 1st write write",
+        3, pending.manifests().size());
+
+    // validate that the metadata summary is correct when using appendManifest
+    Assert.assertEquals("Summary metadata should include 4 added files",
+        "4", pending.summary().get("added-data-files"));
+  }
+
+  @Test
   public void testMergeWithExistingManifestAfterDelete() {
     // merge all manifests for this test
     table.updateProperties().set("commit.manifest.min-count-to-merge", "1").commit();