You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/15 13:42:23 UTC

flink git commit: [FLINK-6001] Fix ContinuousEventTimeTrigger firing without state

Repository: flink
Updated Branches:
  refs/heads/master a3627f201 -> 0a501e9f7


[FLINK-6001] Fix ContinuousEventTimeTrigger firing without state


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a501e9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a501e9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a501e9f

Branch: refs/heads/master
Commit: 0a501e9f7f56baba2905002b74746998458db007
Parents: a3627f2
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Mar 13 15:04:01 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Mar 15 14:41:36 2017 +0100

----------------------------------------------------------------------
 .../triggers/ContinuousEventTimeTrigger.java    |  11 +-
 .../ContinuousEventTimeTriggerTest.java         | 207 +++++++++++++++++++
 .../operators/windowing/TriggerTestHarness.java |  35 +++-
 3 files changed, 238 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a501e9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index f3b3e4f..3e31c09 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -77,10 +77,13 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 			return TriggerResult.FIRE;
 		}
 
-		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
-		if (fireTimestamp.get().equals(time)) {
-			fireTimestamp.clear();
-			fireTimestamp.add(time + interval);
+		ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
+
+		Long fireTimestamp = fireTimestampState.get();
+
+		if (fireTimestamp != null && fireTimestamp.equals(time)) {
+			fireTimestampState.clear();
+			fireTimestampState.add(time + interval);
 			ctx.registerEventTimeTimer(time + interval);
 			return TriggerResult.FIRE;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a501e9f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
new file mode 100644
index 0000000..0f65a88
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ContinuousEventTimeTrigger}.
+ */
+public class ContinuousEventTimeTriggerTest {
+
+	/**
+	 * Verify that the trigger doesn't fail with an NPE if we insert a timer firing when there is
+	 * no trigger state.
+	 */
+	@Test
+	public void testTriggerHandlesAllOnTimerCalls() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.milliseconds(5)), new TimeWindow.Serializer());
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+
+		// this will make the elements we now process fall into late windows, i.e. no trigger state
+		// will be created
+		testHarness.advanceWatermark(10);
+
+		// late fires immediately
+		assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+		// simulate a GC timer firing
+		testHarness.invokeOnEventTime(20, new TimeWindow(0, 2));
+	}
+
+
+	/**
+	 * Verify that state <TimeWindow>of separate windows does not leak into other windows.
+	 */
+	@Test
+	public void testWindowSeparationAndFiring() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer());
+
+		// inject several elements
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(2, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(4, testHarness.numEventTimeTimers());
+		assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		Collection<Tuple2<TimeWindow, TriggerResult>> triggerResults = testHarness.advanceWatermark(2);
+		boolean sawFiring = false;
+		for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) {
+			if (r.f0.equals(new TimeWindow(0, 2))) {
+				sawFiring = true;
+				assertTrue(r.f1.equals(TriggerResult.FIRE));
+			}
+		}
+		assertTrue(sawFiring);
+
+		assertEquals(2, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(3, testHarness.numEventTimeTimers());
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		triggerResults = testHarness.advanceWatermark(4);
+		sawFiring = false;
+		for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) {
+			if (r.f0.equals(new TimeWindow(2, 4))) {
+				sawFiring = true;
+				assertTrue(r.f1.equals(TriggerResult.FIRE));
+			}
+		}
+		assertTrue(sawFiring);
+
+		assertEquals(2, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(2, testHarness.numEventTimeTimers());
+	}
+
+	/**
+	 * Verify that late elements trigger immediately and also that we don't set a timer
+	 * for those.
+	 */
+	@Test
+	public void testLateElementTriggersImmediately() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer());
+
+		testHarness.advanceWatermark(2);
+
+		assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(0, testHarness.numEventTimeTimers());
+	}
+
+
+	/**
+	 * Verify that clear() does not leak across windows.
+	 */
+	@Test
+	public void testClear() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(2, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(4, testHarness.numEventTimeTimers());
+		assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+		assertEquals(1, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(3, testHarness.numEventTimeTimers());
+		assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+		assertEquals(0, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(2, testHarness.numEventTimeTimers()); // doesn't clean up timers
+	}
+
+	@Test
+	public void testMergingWindows() throws Exception {
+		TriggerTestHarness<Object, TimeWindow> testHarness =
+				new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer());
+
+		assertTrue(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)).canMerge());
+
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+		assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+		assertEquals(2, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(4, testHarness.numEventTimeTimers());
+		assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+		testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+		assertEquals(1, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(5, testHarness.numEventTimeTimers()); // on merging, timers are not cleaned up
+		assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+		assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+		assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 4)));
+
+		Collection<Tuple2<TimeWindow, TriggerResult>> triggerResults = testHarness.advanceWatermark(4);
+		boolean sawFiring = false;
+		for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) {
+			if (r.f0.equals(new TimeWindow(0, 4))) {
+				sawFiring = true;
+				assertTrue(r.f1.equals(TriggerResult.FIRE));
+			}
+		}
+
+		assertTrue(sawFiring);
+
+		assertEquals(1, testHarness.numStateEntries());
+		assertEquals(0, testHarness.numProcessingTimeTimers());
+		assertEquals(1, testHarness.numEventTimeTimers());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a501e9f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index b9923f2..4267444 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -82,7 +82,7 @@ public class TriggerTestHarness<T, W extends Window> {
 				new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
 		this.stateBackend = stateBackend;
 
-		this.stateBackend.setCurrentKey(0);
+		this.stateBackend.setCurrentKey(KEY);
 
 		this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
 			@Override
@@ -215,22 +215,35 @@ public class TriggerTestHarness<T, W extends Window> {
 		Collection<Tuple2<W, TriggerResult>> result = new ArrayList<>();
 
 		for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) {
-			TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
-					KEY,
-					timer.getNamespace(),
-					internalTimerService,
-					stateBackend,
-					windowSerializer);
-
-			TriggerResult triggerResult =
-					trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext);
-
+			TriggerResult triggerResult = invokeOnEventTime(timer);
 			result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
 		}
 
 		return result;
 	}
 
+	private TriggerResult invokeOnEventTime(TestInternalTimerService.Timer<Integer, W> timer) throws Exception {
+		TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+				KEY,
+				timer.getNamespace(),
+				internalTimerService,
+				stateBackend,
+				windowSerializer);
+
+		return trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext);
+	}
+
+	/**
+	 * Manually invoke {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)} with
+	 * the given parameters.
+	 */
+	public TriggerResult invokeOnEventTime(long timestamp, W window) throws Exception {
+		TestInternalTimerService.Timer<Integer, W> timer =
+				new TestInternalTimerService.Timer<>(timestamp, KEY, window);
+
+		return invokeOnEventTime(timer);
+	}
+
 	/**
 	 * Calls {@link Trigger#onMerge(Window, Trigger.OnMergeContext)} with the given
 	 * parameters. This also calls {@link Trigger#clear(Window, Trigger.TriggerContext)} on the