You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/07/29 18:39:48 UTC

[flink] branch master updated: [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dc78030  [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp.
dc78030 is described below

commit dc780303e1e4420033949049e4d9368a6d230d88
Author: kkloudas <kk...@gmail.com>
AuthorDate: Sun Jul 29 13:54:26 2018 +0200

    [FLINK-9994][DataStream API] IntervalJoinOp Context#getTimestamp() returns max timestamp.
    
    This closes #6449.
---
 .../api/operators/co/IntervalJoinOperator.java     | 41 ++++++++++++++--------
 .../api/operators/co/IntervalJoinOperatorTest.java |  5 ++-
 2 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 0c449e6..43085cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -152,6 +152,7 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 	@Override
 	public void open() throws Exception {
 		super.open();
+
 		collector = new TimestampedCollector<>(output);
 		context = new ContextImpl(userFunction);
 		internalTimerService =
@@ -204,15 +205,15 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 	}
 
 	@SuppressWarnings("unchecked")
-	private <OUR, OTHER> void processElement(
-			StreamRecord<OUR> record,
-			MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
-			MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
-			long relativeLowerBound,
-			long relativeUpperBound,
-			boolean isLeft) throws Exception {
-
-		final OUR ourValue = record.getValue();
+	private <THIS, OTHER> void processElement(
+			final StreamRecord<THIS> record,
+			final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
+			final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
+			final long relativeLowerBound,
+			final long relativeUpperBound,
+			final boolean isLeft) throws Exception {
+
+		final THIS ourValue = record.getValue();
 		final long ourTimestamp = record.getTimestamp();
 
 		if (ourTimestamp == Long.MIN_VALUE) {
@@ -257,14 +258,18 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 	}
 
 	private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
-		long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+		final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+
 		collector.setAbsoluteTimestamp(resultTimestamp);
-		context.leftTimestamp = leftTimestamp;
-		context.rightTimestamp = rightTimestamp;
+		context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
+
 		userFunction.processElement(left, right, context, collector);
 	}
 
-	private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception {
+	private static <T> void addToBuffer(
+			final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer,
+			final T value,
+			final long timestamp) throws Exception {
 		List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
 		if (elemsInBucket == null) {
 			elemsInBucket = new ArrayList<>();
@@ -313,6 +318,8 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 	 */
 	private final class ContextImpl extends ProcessJoinFunction<T1, T2, OUT>.Context {
 
+		private long resultTimestamp = Long.MIN_VALUE;
+
 		private long leftTimestamp = Long.MIN_VALUE;
 
 		private long rightTimestamp = Long.MIN_VALUE;
@@ -321,6 +328,12 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 			func.super();
 		}
 
+		private void updateTimestamps(long left, long right, long result) {
+			this.leftTimestamp = left;
+			this.rightTimestamp = right;
+			this.resultTimestamp = result;
+		}
+
 		@Override
 		public long getLeftTimestamp() {
 			return leftTimestamp;
@@ -333,7 +346,7 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 
 		@Override
 		public long getTimestamp() {
-			return leftTimestamp;
+			return resultTimestamp;
 		}
 
 		@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
index ee3f4d8..53f514b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
@@ -481,13 +481,16 @@ public class IntervalJoinOperatorTest {
 				TestElem.serializer(),
 				TestElem.serializer(),
 				new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() {
+
+					private static final long serialVersionUID = 1L;
+
 					@Override
 					public void processElement(
 						TestElem left,
 						TestElem right,
 						Context ctx,
 						Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
-						Assert.assertEquals(left.ts, ctx.getTimestamp());
+						Assert.assertEquals(Math.max(left.ts, right.ts), ctx.getTimestamp());
 					}
 				}
 			);