You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/03/13 16:47:10 UTC
[incubator-iceberg] branch master updated: Refactor
BaseRewriteManifests to simplify writer tracking (#818)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new cb18b08 Refactor BaseRewriteManifests to simplify writer tracking (#818)
cb18b08 is described below
commit cb18b0894ae7da305e89f76e2e91267d9e26649f
Author: jun-he <ju...@users.noreply.github.com>
AuthorDate: Fri Mar 13 09:46:59 2020 -0700
Refactor BaseRewriteManifests to simplify writer tracking (#818)
---
.../org/apache/iceberg/BaseRewriteManifests.java | 73 ++++++++--------------
1 file changed, 27 insertions(+), 46 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 226df92..3c40019 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -28,11 +28,11 @@ import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
@@ -68,9 +69,9 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
private final List<ManifestFile> addedManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAddedManifests = Lists.newArrayList();
- private final List<ManifestFile> keptManifests = Collections.synchronizedList(new ArrayList<>());
- private final List<ManifestFile> newManifests = Collections.synchronizedList(new ArrayList<>());
- private final Set<ManifestFile> rewrittenManifests = Collections.synchronizedSet(new HashSet<>());
+ private final Collection<ManifestFile> keptManifests = new ConcurrentLinkedQueue<>();
+ private final Collection<ManifestFile> newManifests = new ConcurrentLinkedQueue<>();
+ private final Set<ManifestFile> rewrittenManifests = Sets.newConcurrentHashSet();
private final Map<Object, WriterWrapper> writers = Maps.newConcurrentMap();
private final AtomicInteger manifestSuffix = new AtomicInteger(0);
@@ -295,13 +296,14 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
Preconditions.checkNotNull(entry, "Manifest entry cannot be null");
Preconditions.checkNotNull(key, "Key cannot be null");
- WriterWrapper writer = getWriter(key);
- writer.addEntry(entry, partitionSpecId);
+ WriterWrapper writer = getWriter(key, partitionSpecId);
+ writer.addEntry(entry);
entryCount.incrementAndGet();
}
- private WriterWrapper getWriter(Object key) {
- return writers.computeIfAbsent(key, k -> new WriterWrapper());
+ private WriterWrapper getWriter(Object key, int partitionSpecId) {
+ return writers.computeIfAbsent(
+ Pair.of(key, partitionSpecId), k -> new WriterWrapper(specsById.get(partitionSpecId)));
}
@Override
@@ -325,59 +327,38 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
}
class WriterWrapper {
- private final Map<Integer, ManifestWriter> manifestWritersBySpecId = Maps.newConcurrentMap();
+ private final PartitionSpec spec;
+ private ManifestWriter writer;
- synchronized void addEntry(ManifestEntry entry, int partitionSpecId) {
- getWriter(partitionSpecId).existing(entry);
+ WriterWrapper(PartitionSpec spec) {
+ this.spec = spec;
}
- synchronized ManifestWriter getWriter(int partitionSpecId) {
- ManifestWriter writer = manifestWritersBySpecId.get(partitionSpecId);
- if (writer != null) {
- if (writer.length() < getManifestTargetSizeBytes()) {
- return writer;
- } else {
- close(partitionSpecId);
- }
+ synchronized void addEntry(ManifestEntry entry) {
+ if (writer == null) {
+ writer = newWriter();
+ } else if (writer.length() >= getManifestTargetSizeBytes()) {
+ close();
+ writer = newWriter();
}
+ writer.existing(entry);
+ }
- // create ManifestWriter with the correct partitionSpec
- PartitionSpec partitionSpec = specsById.get(partitionSpecId);
- OutputFile outputFile = manifestPath(manifestSuffix.getAndIncrement());
- writer = new ManifestWriter(partitionSpec, outputFile, snapshotId());
- manifestWritersBySpecId.put(partitionSpecId, writer);
- return writer;
+ private ManifestWriter newWriter() {
+ return new ManifestWriter(spec, manifestPath(manifestSuffix.getAndIncrement()), snapshotId());
}
- synchronized void close(int partitionSpecId) {
- if (manifestWritersBySpecId != null) {
+ synchronized void close() {
+ if (writer != null) {
try {
- ManifestWriter writer = manifestWritersBySpecId.get(partitionSpecId);
writer.close();
newManifests.add(writer.toManifestFile());
- // remove so that we will not get the closed one again.
- manifestWritersBySpecId.remove(partitionSpecId);
} catch (IOException x) {
throw new RuntimeIOException(x);
}
}
}
- synchronized void close() {
- if (manifestWritersBySpecId != null && manifestWritersBySpecId.size() > 0) {
- // close all the manifestWriters belongs to writterWrapper
- for (ManifestWriter manifestWriter : manifestWritersBySpecId.values()) {
- try {
- manifestWriter.close();
- newManifests.add(manifestWriter.toManifestFile());
- } catch (IOException x) {
- throw new RuntimeIOException(x);
- }
- }
- manifestWritersBySpecId.clear();
- }
- }
-
}
}