You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/15 13:42:23 UTC
flink git commit: [FLINK-6001] Fix ContinuousEventTimeTrigger firing
without state
Repository: flink
Updated Branches:
refs/heads/master a3627f201 -> 0a501e9f7
[FLINK-6001] Fix ContinuousEventTimeTrigger firing without state
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a501e9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a501e9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a501e9f
Branch: refs/heads/master
Commit: 0a501e9f7f56baba2905002b74746998458db007
Parents: a3627f2
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Mar 13 15:04:01 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Mar 15 14:41:36 2017 +0100
----------------------------------------------------------------------
.../triggers/ContinuousEventTimeTrigger.java | 11 +-
.../ContinuousEventTimeTriggerTest.java | 207 +++++++++++++++++++
.../operators/windowing/TriggerTestHarness.java | 35 +++-
3 files changed, 238 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0a501e9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index f3b3e4f..3e31c09 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -77,10 +77,13 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
return TriggerResult.FIRE;
}
- ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
- if (fireTimestamp.get().equals(time)) {
- fireTimestamp.clear();
- fireTimestamp.add(time + interval);
+ ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
+
+ Long fireTimestamp = fireTimestampState.get();
+
+ if (fireTimestamp != null && fireTimestamp.equals(time)) {
+ fireTimestampState.clear();
+ fireTimestampState.add(time + interval);
ctx.registerEventTimeTimer(time + interval);
return TriggerResult.FIRE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a501e9f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
new file mode 100644
index 0000000..0f65a88
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ContinuousEventTimeTrigger}.
+ */
+public class ContinuousEventTimeTriggerTest {
+
+ /**
+ * Verify that the trigger doesn't fail with an NPE if we insert a timer firing when there is
+ * no trigger state.
+ */
+ @Test
+ public void testTriggerHandlesAllOnTimerCalls() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.milliseconds(5)), new TimeWindow.Serializer());
+
+ assertEquals(0, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ // this will make the elements we now process fall into late windows, i.e. no trigger state
+ // will be created
+ testHarness.advanceWatermark(10);
+
+ // late fires immediately
+ assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+ // simulate a GC timer firing
+ testHarness.invokeOnEventTime(20, new TimeWindow(0, 2));
+ }
+
+
+ /**
+ * Verify that state <TimeWindow>of separate windows does not leak into other windows.
+ */
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer());
+
+ // inject several elements
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(4, testHarness.numEventTimeTimers());
+ assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+ assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+ Collection<Tuple2<TimeWindow, TriggerResult>> triggerResults = testHarness.advanceWatermark(2);
+ boolean sawFiring = false;
+ for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) {
+ if (r.f0.equals(new TimeWindow(0, 2))) {
+ sawFiring = true;
+ assertTrue(r.f1.equals(TriggerResult.FIRE));
+ }
+ }
+ assertTrue(sawFiring);
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(3, testHarness.numEventTimeTimers());
+ assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+ assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+ triggerResults = testHarness.advanceWatermark(4);
+ sawFiring = false;
+ for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) {
+ if (r.f0.equals(new TimeWindow(2, 4))) {
+ sawFiring = true;
+ assertTrue(r.f1.equals(TriggerResult.FIRE));
+ }
+ }
+ assertTrue(sawFiring);
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(2, testHarness.numEventTimeTimers());
+ }
+
+ /**
+ * Verify that late elements trigger immediately and also that we don't set a timer
+ * for those.
+ */
+ @Test
+ public void testLateElementTriggersImmediately() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer());
+
+ testHarness.advanceWatermark(2);
+
+ assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+ }
+
+
+ /**
+ * Verify that clear() does not leak across windows.
+ */
+ @Test
+ public void testClear() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(4, testHarness.numEventTimeTimers());
+ assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+ assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+ testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(3, testHarness.numEventTimeTimers());
+ assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+ testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+ assertEquals(0, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(2, testHarness.numEventTimeTimers()); // doesn't clean up timers
+ }
+
+ @Test
+ public void testMergingWindows() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)), new TimeWindow.Serializer());
+
+ assertTrue(ContinuousEventTimeTrigger.<TimeWindow>of(Time.hours(1)).canMerge());
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(4, testHarness.numEventTimeTimers());
+ assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+ assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+ testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)));
+
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(5, testHarness.numEventTimeTimers()); // on merging, timers are not cleaned up
+ assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+ assertEquals(2, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+ assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 4)));
+
+ Collection<Tuple2<TimeWindow, TriggerResult>> triggerResults = testHarness.advanceWatermark(4);
+ boolean sawFiring = false;
+ for (Tuple2<TimeWindow, TriggerResult> r : triggerResults) {
+ if (r.f0.equals(new TimeWindow(0, 4))) {
+ sawFiring = true;
+ assertTrue(r.f1.equals(TriggerResult.FIRE));
+ }
+ }
+
+ assertTrue(sawFiring);
+
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(1, testHarness.numEventTimeTimers());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a501e9f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index b9923f2..4267444 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -82,7 +82,7 @@ public class TriggerTestHarness<T, W extends Window> {
new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
this.stateBackend = stateBackend;
- this.stateBackend.setCurrentKey(0);
+ this.stateBackend.setCurrentKey(KEY);
this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
@Override
@@ -215,22 +215,35 @@ public class TriggerTestHarness<T, W extends Window> {
Collection<Tuple2<W, TriggerResult>> result = new ArrayList<>();
for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) {
- TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
- KEY,
- timer.getNamespace(),
- internalTimerService,
- stateBackend,
- windowSerializer);
-
- TriggerResult triggerResult =
- trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext);
-
+ TriggerResult triggerResult = invokeOnEventTime(timer);
result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
}
return result;
}
+ private TriggerResult invokeOnEventTime(TestInternalTimerService.Timer<Integer, W> timer) throws Exception {
+ TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+ KEY,
+ timer.getNamespace(),
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+
+ return trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), triggerContext);
+ }
+
+ /**
+ * Manually invoke {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)} with
+ * the given parameters.
+ */
+ public TriggerResult invokeOnEventTime(long timestamp, W window) throws Exception {
+ TestInternalTimerService.Timer<Integer, W> timer =
+ new TestInternalTimerService.Timer<>(timestamp, KEY, window);
+
+ return invokeOnEventTime(timer);
+ }
+
/**
* Calls {@link Trigger#onMerge(Window, Trigger.OnMergeContext)} with the given
* parameters. This also calls {@link Trigger#clear(Window, Trigger.TriggerContext)} on the