You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/09 14:19:36 UTC

[1/2] incubator-beam git commit: [flink] initialize watermarkTimeQueue with Comparator

Repository: incubator-beam
Updated Branches:
  refs/heads/master b6205ffa3 -> e9326c8b1


[flink] initialize watermarkTimeQueue with Comparator


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de6ec823
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de6ec823
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de6ec823

Branch: refs/heads/master
Commit: de6ec8238f16f7505eb17ffa293208dabfa3431a
Parents: 26635d7
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 7 16:49:38 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 7 16:51:24 2016 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/WindowDoFnOperator.java          | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de6ec823/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 29ae6ae..075f5df 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -265,7 +265,17 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     int numWatermarkTimers = dataIn.readInt();
 
     watermarkTimers = new HashSet<>(numWatermarkTimers);
-    watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
+
+    watermarkTimersQueue = new PriorityQueue<>(
+            Math.max(numWatermarkTimers, 1),
+            new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
+              @Override
+              public int compare(
+                      Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
+                      Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
+                return o1.f1.compareTo(o2.f1);
+              }
+            });
 
     for (int i = 0; i < numWatermarkTimers; i++) {
       int length = dataIn.readInt();


[2/2] incubator-beam git commit: This closes #929

Posted by mx...@apache.org.
This closes #929


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9326c8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9326c8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9326c8b

Branch: refs/heads/master
Commit: e9326c8b19c74a070b8ce8612af25b79dfb537ab
Parents: b6205ff de6ec82
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 9 16:17:03 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:17:03 2016 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/WindowDoFnOperator.java          | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9326c8b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------