You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/03/13 16:47:10 UTC

[incubator-iceberg] branch master updated: Refactor BaseRewriteManifests to simplify writer tracking (#818)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new cb18b08  Refactor BaseRewriteManifests to simplify writer tracking (#818)
cb18b08 is described below

commit cb18b0894ae7da305e89f76e2e91267d9e26649f
Author: jun-he <ju...@users.noreply.github.com>
AuthorDate: Fri Mar 13 09:46:59 2020 -0700

    Refactor BaseRewriteManifests to simplify writer tracking (#818)
---
 .../org/apache/iceberg/BaseRewriteManifests.java   | 73 ++++++++--------------
 1 file changed, 27 insertions(+), 46 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 226df92..3c40019 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -28,11 +28,11 @@ import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
 
@@ -68,9 +69,9 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
   private final List<ManifestFile> addedManifests = Lists.newArrayList();
   private final List<ManifestFile> rewrittenAddedManifests = Lists.newArrayList();
 
-  private final List<ManifestFile> keptManifests = Collections.synchronizedList(new ArrayList<>());
-  private final List<ManifestFile> newManifests = Collections.synchronizedList(new ArrayList<>());
-  private final Set<ManifestFile> rewrittenManifests = Collections.synchronizedSet(new HashSet<>());
+  private final Collection<ManifestFile> keptManifests = new ConcurrentLinkedQueue<>();
+  private final Collection<ManifestFile> newManifests = new ConcurrentLinkedQueue<>();
+  private final Set<ManifestFile> rewrittenManifests = Sets.newConcurrentHashSet();
   private final Map<Object, WriterWrapper> writers = Maps.newConcurrentMap();
 
   private final AtomicInteger manifestSuffix = new AtomicInteger(0);
@@ -295,13 +296,14 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
     Preconditions.checkNotNull(entry, "Manifest entry cannot be null");
     Preconditions.checkNotNull(key, "Key cannot be null");
 
-    WriterWrapper writer = getWriter(key);
-    writer.addEntry(entry, partitionSpecId);
+    WriterWrapper writer = getWriter(key, partitionSpecId);
+    writer.addEntry(entry);
     entryCount.incrementAndGet();
   }
 
-  private WriterWrapper getWriter(Object key) {
-    return writers.computeIfAbsent(key, k -> new WriterWrapper());
+  private WriterWrapper getWriter(Object key, int partitionSpecId) {
+    return writers.computeIfAbsent(
+        Pair.of(key, partitionSpecId), k -> new WriterWrapper(specsById.get(partitionSpecId)));
   }
 
   @Override
@@ -325,59 +327,38 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
   }
 
   class WriterWrapper {
-    private final Map<Integer, ManifestWriter> manifestWritersBySpecId = Maps.newConcurrentMap();
+    private final PartitionSpec spec;
+    private ManifestWriter writer;
 
-    synchronized void addEntry(ManifestEntry entry, int partitionSpecId) {
-      getWriter(partitionSpecId).existing(entry);
+    WriterWrapper(PartitionSpec spec) {
+      this.spec = spec;
     }
 
-    synchronized ManifestWriter getWriter(int partitionSpecId) {
-      ManifestWriter writer = manifestWritersBySpecId.get(partitionSpecId);
-      if (writer != null) {
-        if (writer.length() < getManifestTargetSizeBytes()) {
-          return writer;
-        } else {
-          close(partitionSpecId);
-        }
+    synchronized void addEntry(ManifestEntry entry) {
+      if (writer == null) {
+        writer = newWriter();
+      } else if (writer.length() >= getManifestTargetSizeBytes()) {
+        close();
+        writer = newWriter();
       }
+      writer.existing(entry);
+    }
 
-      // create ManifestWriter with the correct partitionSpec
-      PartitionSpec partitionSpec = specsById.get(partitionSpecId);
-      OutputFile outputFile = manifestPath(manifestSuffix.getAndIncrement());
-      writer = new ManifestWriter(partitionSpec, outputFile, snapshotId());
-      manifestWritersBySpecId.put(partitionSpecId, writer);
-      return writer;
+    private ManifestWriter newWriter() {
+      return new ManifestWriter(spec, manifestPath(manifestSuffix.getAndIncrement()), snapshotId());
     }
 
-    synchronized void close(int partitionSpecId) {
-      if (manifestWritersBySpecId != null) {
+    synchronized void close() {
+      if (writer != null) {
         try {
-          ManifestWriter writer = manifestWritersBySpecId.get(partitionSpecId);
           writer.close();
           newManifests.add(writer.toManifestFile());
-          // remove so that we will not get the closed one again.
-          manifestWritersBySpecId.remove(partitionSpecId);
         } catch (IOException x) {
           throw new RuntimeIOException(x);
         }
       }
     }
 
-    synchronized void close() {
-      if (manifestWritersBySpecId != null && manifestWritersBySpecId.size() > 0) {
-        // close all the manifestWriters belongs to writterWrapper
-        for (ManifestWriter manifestWriter : manifestWritersBySpecId.values()) {
-          try {
-            manifestWriter.close();
-            newManifests.add(manifestWriter.toManifestFile());
-          } catch (IOException x) {
-            throw new RuntimeIOException(x);
-          }
-        }
-        manifestWritersBySpecId.clear();
-      }
-    }
-
   }
 
 }