You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/27 19:52:50 UTC

[1/2] incubator-beam git commit: This closes #1202

Repository: incubator-beam
Updated Branches:
  refs/heads/master 4cb1d10df -> 3fd3951de


This closes #1202


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3fd3951d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3fd3951d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3fd3951d

Branch: refs/heads/master
Commit: 3fd3951ded4566b9405f4a4246be7b7e8be86d3d
Parents: 4cb1d10 aeb3b3c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 27 12:51:32 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 27 12:51:32 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/WatermarkManager.java      | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Use a NavigableSet Instead of a PriorityQueue in WatermarkManager

Posted by ke...@apache.org.
Use a NavigableSet Instead of a PriorityQueue in WatermarkManager

This removes an O(n) call to remove, replacing it with an O(log(n))
call. This significantly improves scaling behavior of the DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aeb3b3c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aeb3b3c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aeb3b3c4

Branch: refs/heads/master
Commit: aeb3b3c4bfad3e02090b1f7f62695759e17f0189
Parents: 4cb1d10
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 26 16:35:31 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 27 12:51:32 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/WatermarkManager.java      | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aeb3b3c4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f8cbc51..31b8091 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -39,7 +39,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Objects;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -388,7 +387,7 @@ public class WatermarkManager {
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers;
 
-    private final PriorityQueue<TimerData> pendingTimers;
+    private final NavigableSet<TimerData> pendingTimers;
 
     private AtomicReference<Instant> earliestHold;
 
@@ -397,7 +396,7 @@ public class WatermarkManager {
       this.pendingBundles = new HashSet<>();
       this.processingTimers = new HashMap<>();
       this.synchronizedProcessingTimers = new HashMap<>();
-      this.pendingTimers = new PriorityQueue<>();
+      this.pendingTimers = new TreeSet<>();
       Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
       for (Watermark wm : inputWms) {
         initialHold = INSTANT_ORDERING.min(initialHold, wm.get());
@@ -466,7 +465,7 @@ public class WatermarkManager {
         }
       }
       if (!pendingTimers.isEmpty()) {
-        earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), earliest);
+        earliest = INSTANT_ORDERING.min(pendingTimers.first().getTimestamp(), earliest);
       }
       return earliest;
     }
@@ -630,7 +629,7 @@ public class WatermarkManager {
   private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
 
   /**
-   * For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the
+   * For each (Object, NavigableSet) pair in the provided map, remove each Timer that is before the
    * latestTime argument and put in in the result with the same key, then remove all of the keys
    * which have no more pending timers.
    *
@@ -1003,11 +1002,11 @@ public class WatermarkManager {
 
   private static class PerKeyHolds {
     private final Map<Object, KeyedHold> keyedHolds;
-    private final PriorityQueue<KeyedHold> allHolds;
+    private final NavigableSet<KeyedHold> allHolds;
 
     private PerKeyHolds() {
       this.keyedHolds = new HashMap<>();
-      this.allHolds = new PriorityQueue<>();
+      this.allHolds = new TreeSet<>();
     }
 
     /**
@@ -1015,7 +1014,7 @@ public class WatermarkManager {
      * there are no holds within this {@link PerKeyHolds}.
      */
     public Instant getMinHold() {
-      return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.peek().getTimestamp();
+      return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.first().getTimestamp();
     }
 
     /**
@@ -1026,7 +1025,7 @@ public class WatermarkManager {
       removeHold(key);
       KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
       keyedHolds.put(key, newKeyedHold);
-      allHolds.offer(newKeyedHold);
+      allHolds.add(newKeyedHold);
     }
 
     /**