You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/17 19:05:02 UTC

[incubator-iceberg] branch master updated: ReplaceManifests: use writer length instead of estimate (#221)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a333ea1  ReplaceManifests: use writer length instead of estimate (#221)
a333ea1 is described below

commit a333ea1a96a8d21fb15b2bd4d55cdcc34f5b6031
Author: bryanck <br...@gmail.com>
AuthorDate: Mon Jun 17 12:04:57 2019 -0700

    ReplaceManifests: use writer length instead of estimate (#221)
---
 .../main/java/org/apache/iceberg/ReplaceManifests.java  | 17 +++++------------
 1 file changed, 5 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
index 73a0505..b43cfc6 100644
--- a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
+++ b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
@@ -168,14 +168,11 @@ public class ReplaceManifests extends SnapshotProducer<RewriteManifests> impleme
               keptManifests.add(manifest);
             } else {
               replacedManifests.add(manifest);
-              long entryNum = manifest.addedFilesCount() + manifest.existingFilesCount() + manifest.deletedFilesCount();
-              long avgEntryLen = manifest.length() / entryNum;
-
               try (ManifestReader reader =
                      ManifestReader.read(ops.io().newInputFile(manifest.path()), ops.current()::spec)) {
                 FilteredManifest filteredManifest = reader.select(Arrays.asList("*"));
                 filteredManifest.liveEntries().forEach(
-                    entry -> appendEntry(entry, avgEntryLen, clusterByFunc.apply(entry.file()))
+                    entry -> appendEntry(entry, clusterByFunc.apply(entry.file()))
                 );
 
               } catch (IOException x) {
@@ -188,12 +185,12 @@ public class ReplaceManifests extends SnapshotProducer<RewriteManifests> impleme
     }
   }
 
-  private void appendEntry(ManifestEntry entry, long avgEntryLen, Object key) {
+  private void appendEntry(ManifestEntry entry, Object key) {
     Preconditions.checkNotNull(entry, "Manifest entry cannot be null");
     Preconditions.checkNotNull(key, "Key cannot be null");
 
     WriterWrapper writer = getWriter(key);
-    writer.addEntry(entry, avgEntryLen);
+    writer.addEntry(entry);
     entryCount.incrementAndGet();
   }
 
@@ -226,22 +223,18 @@ public class ReplaceManifests extends SnapshotProducer<RewriteManifests> impleme
 
   class WriterWrapper {
     private ManifestWriter writer;
-    private long estimatedSize;
 
-    synchronized void addEntry(ManifestEntry entry, long len) {
+    synchronized void addEntry(ManifestEntry entry) {
       if (writer == null) {
         writer = newWriter();
-      } else if (estimatedSize >= getManifestTargetSizeBytes()) {
+      } else if (writer.length() >= getManifestTargetSizeBytes()) {
         close();
         writer = newWriter();
       }
-
       writer.existing(entry);
-      estimatedSize += len;
     }
 
     private ManifestWriter newWriter() {
-      estimatedSize = 0;
       return new ManifestWriter(spec, manifestPath(manifestCount.getAndIncrement()), snapshotId());
     }