You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/09 14:19:36 UTC
[1/2] incubator-beam git commit: [flink] initialize
watermarkTimeQueue with Comparator
Repository: incubator-beam
Updated Branches:
refs/heads/master b6205ffa3 -> e9326c8b1
[flink] initialize watermarkTimeQueue with Comparator
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de6ec823
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de6ec823
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de6ec823
Branch: refs/heads/master
Commit: de6ec8238f16f7505eb17ffa293208dabfa3431a
Parents: 26635d7
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 7 16:49:38 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 7 16:51:24 2016 +0200
----------------------------------------------------------------------
.../wrappers/streaming/WindowDoFnOperator.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de6ec823/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 29ae6ae..075f5df 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -265,7 +265,17 @@ public class WindowDoFnOperator<K, InputT, OutputT>
int numWatermarkTimers = dataIn.readInt();
watermarkTimers = new HashSet<>(numWatermarkTimers);
- watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
+
+ watermarkTimersQueue = new PriorityQueue<>(
+ Math.max(numWatermarkTimers, 1),
+ new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
+ @Override
+ public int compare(
+ Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
+ Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
+ return o1.f1.compareTo(o2.f1);
+ }
+ });
for (int i = 0; i < numWatermarkTimers; i++) {
int length = dataIn.readInt();
[2/2] incubator-beam git commit: This closes #929
Posted by mx...@apache.org.
This closes #929
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9326c8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9326c8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9326c8b
Branch: refs/heads/master
Commit: e9326c8b19c74a070b8ce8612af25b79dfb537ab
Parents: b6205ff de6ec82
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 9 16:17:03 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:17:03 2016 +0200
----------------------------------------------------------------------
.../wrappers/streaming/WindowDoFnOperator.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9326c8b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------