You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/27 19:52:51 UTC
[2/2] incubator-beam git commit: Use a NavigableSet Instead of a
PriorityQueue in WatermarkManager
Use a NavigableSet Instead of a PriorityQueue in WatermarkManager
This removes an O(n) call to remove, replacing it with an O(log(n))
call. This significantly improves scaling behavior of the DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aeb3b3c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aeb3b3c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aeb3b3c4
Branch: refs/heads/master
Commit: aeb3b3c4bfad3e02090b1f7f62695759e17f0189
Parents: 4cb1d10
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 26 16:35:31 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 27 12:51:32 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/WatermarkManager.java | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aeb3b3c4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f8cbc51..31b8091 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -39,7 +39,6 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
-import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -388,7 +387,7 @@ public class WatermarkManager {
private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers;
private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers;
- private final PriorityQueue<TimerData> pendingTimers;
+ private final NavigableSet<TimerData> pendingTimers;
private AtomicReference<Instant> earliestHold;
@@ -397,7 +396,7 @@ public class WatermarkManager {
this.pendingBundles = new HashSet<>();
this.processingTimers = new HashMap<>();
this.synchronizedProcessingTimers = new HashMap<>();
- this.pendingTimers = new PriorityQueue<>();
+ this.pendingTimers = new TreeSet<>();
Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (Watermark wm : inputWms) {
initialHold = INSTANT_ORDERING.min(initialHold, wm.get());
@@ -466,7 +465,7 @@ public class WatermarkManager {
}
}
if (!pendingTimers.isEmpty()) {
- earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), earliest);
+ earliest = INSTANT_ORDERING.min(pendingTimers.first().getTimestamp(), earliest);
}
return earliest;
}
@@ -630,7 +629,7 @@ public class WatermarkManager {
private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
/**
- * For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the
+ * For each (Object, NavigableSet) pair in the provided map, remove each Timer that is before the
* latestTime argument and put in in the result with the same key, then remove all of the keys
* which have no more pending timers.
*
@@ -1003,11 +1002,11 @@ public class WatermarkManager {
private static class PerKeyHolds {
private final Map<Object, KeyedHold> keyedHolds;
- private final PriorityQueue<KeyedHold> allHolds;
+ private final NavigableSet<KeyedHold> allHolds;
private PerKeyHolds() {
this.keyedHolds = new HashMap<>();
- this.allHolds = new PriorityQueue<>();
+ this.allHolds = new TreeSet<>();
}
/**
@@ -1015,7 +1014,7 @@ public class WatermarkManager {
* there are no holds within this {@link PerKeyHolds}.
*/
public Instant getMinHold() {
- return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.peek().getTimestamp();
+ return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.first().getTimestamp();
}
/**
@@ -1026,7 +1025,7 @@ public class WatermarkManager {
removeHold(key);
KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
keyedHolds.put(key, newKeyedHold);
- allHolds.offer(newKeyedHold);
+ allHolds.add(newKeyedHold);
}
/**