You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2016/09/29 15:51:52 UTC
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
GitHub user aljoscha opened a pull request:
https://github.com/apache/flink/pull/2572
[FLINK-4552] Refactor WindowOperator/Trigger Tests
This builds on #2570
Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction
were all conflated in WindowOperatorTest. All of these tested that a
certain combination of a Trigger, WindowAssigner and WindowFunction produce
the expected output.
This change modularizes these tests and spreads them out across multiple
files. For example, one per trigger/window assigner.
The new WindowOperatorTest tests verify that the interaction between
WindowOperator and the various other parts works as expected, that the
correct methods on Trigger and WindowFunction are called at the expected
time and that snapshotting, timers, cleanup etc. work correctly. These tests
also verify that the different state types and WindowFunctions work correctly.
For trigger tests this introduces TriggerTestHarness. This can be used
to inject elements into Triggers they fire at the correct times. The
actual output of the WindowFunction is not important for these tests.
The new tests also make sure that triggers correctly clean up state and timers.
WindowAssigner tests verify the behaviour of window assigners in isolation.
They also test, for example, whether offset parameter of time-based windows
work correctly.
R: @StephanEwen @StefanRRichter @kl0u for review
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aljoscha/flink window-test-refactor
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2572.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2572
----
commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2016-09-25T18:58:16Z
Rename TimeServiceProvider to ProcessingTimeService
The name is clashing with the soon-to-be-added
TimerService/InternalTimerService which is meant as an interface for
dealing with both processing time and event time.
TimeServiceProvided is renamed to ProcessingTimeService to reflect the
fact that it is a low-level utility that only deals with "physical"
processing-time trigger tasks.
commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2016-09-28T13:10:35Z
Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests
commit f6dd9c74dc2c58c4263fb6d084651b514898d47a
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2016-09-28T14:35:33Z
Use Processing-Time Service of TestHarness in WindowOperatorTest
Before, this was manually creating a TestProcessingTimeService, now,
we're using the one that is there by default in
OneInputStreamOperatorTestHarness.
commit 65389d66c5586e6707b7a6bf48df512354fac085
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2016-09-28T14:43:40Z
Refactor OperatorTestHarness to always use TestProcessingTimeService
Before, this would allow handing in a custom ProcessingTimeService but
this was in reality always TestProcessingTimeService.
commit 1d013bcacc040552e5783c64d094ec309014457b
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2016-09-28T13:12:26Z
Use TestHarness Processing-time Facility in BucketingSinkTest
Before, this was manually creating a TestProcessingTimeService. Now we
use the one that is there by default in
OneInputStreamOperatorTestHarness.
commit eaf3dd00fefeb2487c7cafff6337123cbe42874b
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2016-09-28T13:32:24Z
Use OperatorTestHarness in AlignedWindowOperator Tests
commit b597d2ef50c27554b83fddaff8873107265340d4
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2016-09-29T14:04:29Z
Refactor Operator TestHarnesses to use Common Base Class
This also introduces KeyedTwoInputStreamOperatorTestHarness which
is similar to KeyedOneInputStreamOperatorTestHarness
commit 58b16b26e07b6100f89e9deec63f0decb751f0e6
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2016-09-26T14:21:51Z
[FLINK-3674] Add an interface for Time aware User Functions
This moves the event-time/processing-time trigger code from
WindowOperator behind a well defined interface that can be used by
operators (and user functions).
InternalTimerService is the new interface that has the same
functionality that WindowOperator used to have. TimerService is the user
facing interface that does not allow dealing with namespaces/payloads
and also does not allow deleting timers. There is a default
implementation in HeapInternalTimerService that can checkpoint timers to
a stream and also restore from a stream. Right now, this is managed in
AbstractStreamOperator and operators can ask for an
InternalTimerService.
This also adds tests for HeapInternalTimerService.
This adds two new user functions:
- TimelyFlatMapFunction: an extension of FlatMapFunction that also
allows querying time and setting timers
- TimelyCoFlatMapFunction: the same, but for CoFlatMapFunction
There are two new StreamOperator implementations for these that use the
InternalTimerService interface.
This also adds tests for the two new operators.
This also adds the new interface KeyContext that is used for
setting/querying the current key context for state and timers. Timers
are always scoped to a key, for now.
Also, this moves the handling of watermarks for both one-input and
two-input operators to AbstractStreamOperators so that we have a central
ground-truth.
commit e351b4a409e50c53645ebf7bdbec263148fa956b
Author: Aljoscha Krettek <al...@gmail.com>
Date: 2016-09-05T10:01:11Z
[FLINK-4552] Refactor WindowOperator/Trigger Tests
Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction
were all conflated in WindowOperatorTest. All of these tested that a
certain combination of a Trigger, WindowAssigner and WindowFunction produce
the expected output.
This change modularizes these tests and spreads them out across multiple
files. For example, one per trigger/window assigner.
The new WindowOperatorTest tests verify that the interaction between
WindowOperator and the various other parts works as expected, that the
correct methods on Trigger and WindowFunction are called at the expected
time and that snapshotting, timers, cleanup etc. work correctly. These tests
also verify that the different state types and WindowFunctions work correctly.
For trigger tests this introduces TriggerTestHarness. This can be used
to inject elements into Triggers they fire at the correct times. The
actual output of the WindowFunction is not important for these tests.
The new tests also make sure that triggers correctly clean up state and timers.
WindowAssigner tests verify the behaviour of window assigners in isolation.
They also test, for example, whether offset parameter of time-based windows
work correctly.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85359406
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+ /**
+ * Verify that state of separate windows does not leak into other windows.
+ */
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // right now, CountTrigger will clear it's state in onElement when firing
+ // ideally, this should be moved to onFire()
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+ }
--- End diff --
You could also let W(2,4) fire for completeness, so that the count was not reset by the previous firing.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85361303
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness<T, W extends Window> {
+
+ private static final Integer KEY = 1;
+
+ private final Trigger<T, W> trigger;
+ private final TypeSerializer<W> windowSerializer;
+
+ private final HeapKeyedStateBackend<Integer> stateBackend;
+ private final TestInternalTimerService<Integer, W> internalTimerService;
+
+ public TriggerTestHarness(
+ Trigger<T, W> trigger,
+ TypeSerializer<W> windowSerializer) throws Exception {
+ this.trigger = trigger;
+ this.windowSerializer = windowSerializer;
+
+ // we only ever use one key, other tests make sure that windows work across different
+ // keys
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ @SuppressWarnings("unchecked")
+ HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+ this.stateBackend = stateBackend;
+
+ this.stateBackend.setCurrentKey(0);
+
+ this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
+ @Override
+ public void setCurrentKey(Object key) {
+ // ignore
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return KEY;
+ }
+ });
+ }
+
+ public int numProcessingTimeTimers() {
+ return internalTimerService.numProcessingTimeTimers();
+ }
+
+ public int numProcessingTimeTimers(W window) {
+ return internalTimerService.numProcessingTimeTimers(window);
+ }
+
+ public int numEventTimeTimers() {
+ return internalTimerService.numEventTimeTimers();
+ }
+
+ public int numEventTimeTimers(W window) {
+ return internalTimerService.numEventTimeTimers(window);
+ }
+
+ public int numStateEntries() {
+ return stateBackend.numStateEntries();
+ }
+
+ public int numStateEntries(W window) {
+ return stateBackend.numStateEntries(window);
+ }
+
+ /**
+ * Injects one element into the trigger for the given window and returns the result of
+ * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}
+ */
+ public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception {
+ TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+ KEY,
+ window,
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+ return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext);
+ }
+
+ /**
+ * Advanced processing time and checks whether we have exactly one firing for the given
+ * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}
+ * is returned for that firing.
+ */
+ public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
+ Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2<W, TriggerResult> firing = firings.iterator().next();
+
+ if (!firing.f0.equals(window)) {
+ throw new IllegalStateException("Trigger fired for another window.");
+ }
+
+ return firing.f1;
+ }
+
+ /**
+ * Advanced the watermark and checks whether we have exactly one firing for the given
+ * window. The result of {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}
+ * is returned for that firing.
+ */
+ public TriggerResult advanceWatermark(long time, W window) throws Exception {
+ Collection<Tuple2<W, TriggerResult>> firings = advanceWatermark(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2<W, TriggerResult> firing = firings.iterator().next();
--- End diff --
Same here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85501090
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness<T, W extends Window> {
+
+ private static final Integer KEY = 1;
+
+ private final Trigger<T, W> trigger;
+ private final TypeSerializer<W> windowSerializer;
+
+ private final HeapKeyedStateBackend<Integer> stateBackend;
+ private final TestInternalTimerService<Integer, W> internalTimerService;
+
+ public TriggerTestHarness(
+ Trigger<T, W> trigger,
+ TypeSerializer<W> windowSerializer) throws Exception {
+ this.trigger = trigger;
+ this.windowSerializer = windowSerializer;
+
+ // we only ever use one key, other tests make sure that windows work across different
+ // keys
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ @SuppressWarnings("unchecked")
+ HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+ this.stateBackend = stateBackend;
+
+ this.stateBackend.setCurrentKey(0);
+
+ this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
+ @Override
+ public void setCurrentKey(Object key) {
+ // ignore
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return KEY;
+ }
+ });
+ }
+
+ public int numProcessingTimeTimers() {
+ return internalTimerService.numProcessingTimeTimers();
+ }
+
+ public int numProcessingTimeTimers(W window) {
+ return internalTimerService.numProcessingTimeTimers(window);
+ }
+
+ public int numEventTimeTimers() {
+ return internalTimerService.numEventTimeTimers();
+ }
+
+ public int numEventTimeTimers(W window) {
+ return internalTimerService.numEventTimeTimers(window);
+ }
+
+ public int numStateEntries() {
+ return stateBackend.numStateEntries();
+ }
+
+ public int numStateEntries(W window) {
+ return stateBackend.numStateEntries(window);
+ }
+
+ /**
+ * Injects one element into the trigger for the given window and returns the result of
+ * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}
+ */
+ public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception {
+ TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+ KEY,
+ window,
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+ return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext);
+ }
+
+ /**
+ * Advanced processing time and checks whether we have exactly one firing for the given
+ * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}
+ * is returned for that firing.
+ */
+ public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
+ Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2<W, TriggerResult> firing = firings.iterator().next();
--- End diff --
True :-)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85360454
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link EventTimeTrigger}.
+ */
+public class EventTimeTriggerTest {
+
+ /**
+ * Verify that state of separate windows does not leak into other windows.
+ */
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+ // inject several elements
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ assertEquals(0, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(2, testHarness.numEventTimeTimers());
+ assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+ assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(2, new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(1, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+ assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(2, 4)));
+
--- End diff --
We could also check that multiple trigger fire when a watermark surpassed them all at once (and maybe not all if we use 3 trigger for this), to catch corner cases.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85500578
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyOnMergeContext;
+import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTimeWindow;
+import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTriggerContext;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link PurgingTrigger}.
+ */
+public class PurgingTriggerTest {
+
+ /**
+ * Check if {@link PurgingTrigger} implements all methods of {@link Trigger}, as a sanity
+ * check.
+ */
+ @Test
+ public void testAllMethodsImplemented() throws NoSuchMethodException {
--- End diff --
`Trigger` has `canMerge()` and `onMerge()` which are not abstract in `Trigger` (`clear()` used to not be abstract but is now, after we recently changed that on master). The purging trigger can therefore be non-abstract and still not implement these methods, therefore the checks.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85499215
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsWindowsTest.java ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link EventTimeSessionWindows}
+ */
+public class EventTimeSessionWindowsWindowsTest extends TestLogger {
+
+ @Test
+ public void testWindowAssignment() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000));
--- End diff --
Done
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85498801
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+ /**
+ * Verify that state of separate windows does not leak into other windows.
+ */
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // right now, CountTrigger will clear it's state in onElement when firing
+ // ideally, this should be moved to onFire()
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+ }
+
+ /**
+ * Verify that clear() does not leak across windows.
+ */
+ @Test
+ public void testClear() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+ testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+ testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+ assertEquals(0, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4)));
+ }
+
+ @Test
+ public void testMergingWindows() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
--- End diff --
Done, theres are also more tests about merging specifically in `MergingWindowSetTest` and `WindowOperatorTest`. The core of the merging "algorithm" is exercised in `MergingWindowSetTest`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:
https://github.com/apache/flink/pull/2572
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85360968
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyOnMergeContext;
+import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTimeWindow;
+import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTriggerContext;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link PurgingTrigger}.
+ */
+public class PurgingTriggerTest {
+
+ /**
+ * Check if {@link PurgingTrigger} implements all methods of {@link Trigger}, as a sanity
+ * check.
+ */
+ @Test
+ public void testAllMethodsImplemented() throws NoSuchMethodException {
--- End diff --
What is the purpose of this and how is it different from checking that PurgingTrigger is not abstract?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/2572
manually merged
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/2572
Had another look at the changes, I think it is a very well written test! +1 for merge.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/2572
Yeah, I know the pain ;-) but next times when we need to fix only half the amount of tests on a change it will quickly pay off (hopefully)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/2572
Yeah, alright, I'm just lazy. \U0001f605 I'll whip something up.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/2572
R: @StefanRRichter @kl0u , this sits on top of #2570 so you can review that one first and then this one additional commit here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/2572
No, it's more or less that but there is more stuff that needs to be in that adapter, for example in `testOnProcessingTimeFire()`, I'm highlighting the places that need changing:
```
public void testOnProcessingTimeFire() throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
when(mockAssigner.isEventTime()).thenReturn(false); <-- here
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, WindowedValue<List<Integer>, TimeWindow>> testHarness =
createListWindowOperator(mockAssigner, mockTrigger, 0L);
testHarness.open();
testHarness.setProcessingTime(Long.MIN_VALUE); <-- here
when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
.thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
doAnswer(new Answer<TriggerResult>() {
@Override
public TriggerResult answer(InvocationOnMock invocation) throws Exception {
Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3];
context.registerProcessingTimeTimer(0L); <-- here
context.getPartitionedState(valueStateDescriptor).update("hello");
return TriggerResult.CONTINUE;
}
}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
shouldFireOnProcessingTime(mockTrigger); <-- here
testHarness.processElement(new StreamRecord<>(0, 0L));
assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents and trigger state for two windows
assertEquals(4, testHarness.numProcessingTimeTimers()); // timers/gc timers for two windows <-- here
testHarness.setProcessingTime(0L); <-- here
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
// FIRE should not purge contents
assertEquals(4, testHarness.numKeyedStateEntries());
assertEquals(2, testHarness.numProcessingTimeTimers()); // only gc timers left <-- here
// there should be two elements now
assertThat(testHarness.extractOutputStreamRecords(),
containsInAnyOrder(
isWindowedValue(contains(0), 1L, timeWindow(0, 2)),
isWindowedValue(contains(0), 3L, timeWindow(2, 4))));
}
```
(man I can't make text bold in a code block ... \U0001f62d)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85359630
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+ /**
+ * Verify that state of separate windows does not leak into other windows.
+ */
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // right now, CountTrigger will clear it's state in onElement when firing
+ // ideally, this should be moved to onFire()
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+ }
+
+ /**
+ * Verify that clear() does not leak across windows.
+ */
+ @Test
+ public void testClear() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+ testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+ testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+ assertEquals(0, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4)));
+ }
+
+ @Test
+ public void testMergingWindows() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
--- End diff --
You could test this a little bit more, e.g. with a wider window that also subsumes both, or 3 windows and just merging two, etc. to catch corner cases
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85361208
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness<T, W extends Window> {
+
+ private static final Integer KEY = 1;
+
+ private final Trigger<T, W> trigger;
+ private final TypeSerializer<W> windowSerializer;
+
+ private final HeapKeyedStateBackend<Integer> stateBackend;
+ private final TestInternalTimerService<Integer, W> internalTimerService;
+
+ public TriggerTestHarness(
+ Trigger<T, W> trigger,
+ TypeSerializer<W> windowSerializer) throws Exception {
+ this.trigger = trigger;
+ this.windowSerializer = windowSerializer;
+
+ // we only ever use one key, other tests make sure that windows work across different
+ // keys
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ @SuppressWarnings("unchecked")
+ HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+ this.stateBackend = stateBackend;
+
+ this.stateBackend.setCurrentKey(0);
+
+ this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
+ @Override
+ public void setCurrentKey(Object key) {
+ // ignore
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return KEY;
+ }
+ });
+ }
+
+ public int numProcessingTimeTimers() {
+ return internalTimerService.numProcessingTimeTimers();
+ }
+
+ public int numProcessingTimeTimers(W window) {
+ return internalTimerService.numProcessingTimeTimers(window);
+ }
+
+ public int numEventTimeTimers() {
+ return internalTimerService.numEventTimeTimers();
+ }
+
+ public int numEventTimeTimers(W window) {
+ return internalTimerService.numEventTimeTimers(window);
+ }
+
+ public int numStateEntries() {
+ return stateBackend.numStateEntries();
+ }
+
+ public int numStateEntries(W window) {
+ return stateBackend.numStateEntries(window);
+ }
+
+ /**
+ * Injects one element into the trigger for the given window and returns the result of
+ * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}
+ */
+ public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception {
+ TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+ KEY,
+ window,
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+ return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext);
+ }
+
+ /**
+ * Advanced processing time and checks whether we have exactly one firing for the given
+ * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}
+ * is returned for that firing.
+ */
+ public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
+ Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2<W, TriggerResult> firing = firings.iterator().next();
--- End diff --
I don't think this checks EXACTLY one, but only AT LEAST one.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85500759
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness<T, W extends Window> {
+
+ private static final Integer KEY = 1;
+
+ private final Trigger<T, W> trigger;
+ private final TypeSerializer<W> windowSerializer;
+
+ private final HeapKeyedStateBackend<Integer> stateBackend;
+ private final TestInternalTimerService<Integer, W> internalTimerService;
+
+ public TriggerTestHarness(
+ Trigger<T, W> trigger,
+ TypeSerializer<W> windowSerializer) throws Exception {
+ this.trigger = trigger;
+ this.windowSerializer = windowSerializer;
+
+ // we only ever use one key, other tests make sure that windows work across different
+ // keys
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ @SuppressWarnings("unchecked")
+ HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+ this.stateBackend = stateBackend;
+
+ this.stateBackend.setCurrentKey(0);
+
+ this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
+ @Override
+ public void setCurrentKey(Object key) {
+ // ignore
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return KEY;
+ }
+ });
+ }
+
+ public int numProcessingTimeTimers() {
+ return internalTimerService.numProcessingTimeTimers();
+ }
+
+ public int numProcessingTimeTimers(W window) {
+ return internalTimerService.numProcessingTimeTimers(window);
+ }
+
+ public int numEventTimeTimers() {
+ return internalTimerService.numEventTimeTimers();
+ }
+
+ public int numEventTimeTimers(W window) {
+ return internalTimerService.numEventTimeTimers(window);
+ }
+
+ public int numStateEntries() {
+ return stateBackend.numStateEntries();
+ }
+
+ public int numStateEntries(W window) {
+ return stateBackend.numStateEntries(window);
+ }
+
+ /**
+ * Injects one element into the trigger for the given window and returns the result of
+ * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}
+ */
+ public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception {
+ TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>(
+ KEY,
+ window,
+ internalTimerService,
+ stateBackend,
+ windowSerializer);
+ return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext);
+ }
+
+ /**
+ * Advanced processing time and checks whether we have exactly one firing for the given
+ * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}
+ * is returned for that firing.
+ */
+ public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
+ Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings);
+ }
+
+ Tuple2<W, TriggerResult> firing = firings.iterator().next();
--- End diff --
With the check in the lines above (that checks for the size of `firings`) it should be correct, right?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/2572
Yes, I was aware that some configuration/mocking also depends on the time domain. However what those calls do is essentially equivalent, just for different time domain. So I assume an adapter that just dispatches for different time domains should not obscure the logic of the tests?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/2572
My idea would be to introduce an adapter that abstracts advancing of time, where two concrete implementations exist that forward to advanceWatermark or on onProcessingTime. Or am I missing something that fundamentally more different?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/2572
Thanks for your review, @StefanRRichter! You found some areas where this can be improved.
I'm not yet sure whether parameterisation for processing-time/event-time can be done without obscuring what the test actually does to much but I'll try and come up with something.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink issue #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/2572
@StefanRRichter I added more tests (see the commit) and deduplicated processing-time/event-time tests. PTAL (please take another look) \U0001f603
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85360067
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsWindowsTest.java ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link EventTimeSessionWindows}
+ */
+public class EventTimeSessionWindowsWindowsTest extends TestLogger {
+
+ @Test
+ public void testWindowAssignment() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000));
--- End diff --
Very minor, but I think a named constant for the value 5000 might make the contracts even clearer (e.g. int gap = 5000) and then do (+ gap) to window start time to obtain end time.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85500442
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link EventTimeTrigger}.
+ */
+public class EventTimeTriggerTest {
+
+ /**
+ * Verify that state of separate windows does not leak into other windows.
+ */
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+ // inject several elements
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ assertEquals(0, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(2, testHarness.numEventTimeTimers());
+ assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+ assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(2, new TimeWindow(0, 2)));
+
+ assertEquals(0, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(1, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
+
+ assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(2, 4)));
+
--- End diff --
The trigger tests only test the isolated behaviour of Triggers, there are no multiple keys here so that wouldn't work. You're right though, about the multiple timers so I'll add a test for this to `WindowOperatorTest` where the timer firing behaviour is tested.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85497817
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+ /**
+ * Verify that state of separate windows does not leak into other windows.
+ */
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // right now, CountTrigger will clear it's state in onElement when firing
+ // ideally, this should be moved to onFire()
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
+ }
--- End diff --
Done
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---