You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/07/29 18:52:58 UTC
[flink] branch release-1.6 updated: [FLINK-9994][DataStream API]
IntervalJoinOp Context#getTimestamp() returns max timestamp.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 850983e [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp.
850983e is described below
commit 850983ec3a869f158cc900b9de6435e47779e3f9
Author: kkloudas <kk...@gmail.com>
AuthorDate: Sun Jul 29 13:54:26 2018 +0200
[FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp.
---
.../api/operators/co/IntervalJoinOperator.java | 41 ++++++++++++++--------
.../api/operators/co/IntervalJoinOperatorTest.java | 5 ++-
2 files changed, 31 insertions(+), 15 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 0c449e6..43085cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -152,6 +152,7 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
@Override
public void open() throws Exception {
super.open();
+
collector = new TimestampedCollector<>(output);
context = new ContextImpl(userFunction);
internalTimerService =
@@ -204,15 +205,15 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
}
@SuppressWarnings("unchecked")
- private <OUR, OTHER> void processElement(
- StreamRecord<OUR> record,
- MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
- MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
- long relativeLowerBound,
- long relativeUpperBound,
- boolean isLeft) throws Exception {
-
- final OUR ourValue = record.getValue();
+ private <THIS, OTHER> void processElement(
+ final StreamRecord<THIS> record,
+ final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
+ final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
+ final long relativeLowerBound,
+ final long relativeUpperBound,
+ final boolean isLeft) throws Exception {
+
+ final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();
if (ourTimestamp == Long.MIN_VALUE) {
@@ -257,14 +258,18 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
}
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
- long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+ final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+
collector.setAbsoluteTimestamp(resultTimestamp);
- context.leftTimestamp = leftTimestamp;
- context.rightTimestamp = rightTimestamp;
+ context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
+
userFunction.processElement(left, right, context, collector);
}
- private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
+ private static <T> void addToBuffer(
+ final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer,
+ final T value,
+ final long timestamp) throws Exception {
List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
if (elemsInBucket == null) {
elemsInBucket = new ArrayList<>();
@@ -313,6 +318,8 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
*/
private final class ContextImpl extends ProcessJoinFunction<T1, T2, OUT>.Context {
+ private long resultTimestamp = Long.MIN_VALUE;
+
private long leftTimestamp = Long.MIN_VALUE;
private long rightTimestamp = Long.MIN_VALUE;
@@ -321,6 +328,12 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
func.super();
}
+ private void updateTimestamps(long left, long right, long result) {
+ this.leftTimestamp = left;
+ this.rightTimestamp = right;
+ this.resultTimestamp = result;
+ }
+
@Override
public long getLeftTimestamp() {
return leftTimestamp;
@@ -333,7 +346,7 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
@Override
public long getTimestamp() {
- return leftTimestamp;
+ return resultTimestamp;
}
@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
index ee3f4d8..53f514b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
@@ -481,13 +481,16 @@ public class IntervalJoinOperatorTest {
TestElem.serializer(),
TestElem.serializer(),
new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+
+ private static final long serialVersionUID = 1L;
+
@Override
public void processElement(
TestElem left,
TestElem right,
Context ctx,
Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
- Assert.assertEquals(left.ts, ctx.getTimestamp());
+ Assert.assertEquals(Math.max(left.ts, right.ts), ctx.getTimestamp());
}
}
);