You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yuemeng (JIRA)" <ji...@apache.org> on 2018/04/26 08:57:00 UTC
[jira] [Updated] (FLINK-9201) same merge window will be fired twice
if watermark already passed the merge window
[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yuemeng updated FLINK-9201:
---------------------------
Summary: same merge window will be fired twice if watermark already passed the merge window (was: same merge window will be fired twice if watermark already passed the new merged window)
> same merge window will be fired twice if watermark already passed the merge window
> ----------------------------------------------------------------------------------
>
> Key: FLINK-9201
> URL: https://issues.apache.org/jira/browse/FLINK-9201
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.3.3
> Reporter: yuemeng
> Assignee: yuemeng
> Priority: Blocker
>
> sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds
> * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
> * if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11.
> * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark.
> that mean w3 will be fired twice because of watermark pass the new merge window w3.
> Examples
> {code:java}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
> closeCalled.set(0);
> final int sessionSize = 3;
> TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
> ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
> inputType.createSerializer(new ExecutionConfig()));
> WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
> EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
> new TimeWindow.Serializer(),
> new TupleKeySelector(),
> BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
> stateDesc,
> new InternalIterableWindowFunction<>(new SessionWindowFunction()),
> EventTimeTrigger.create(),
> 60000,
> null /* late data output tag */);
> OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
> createTestHarness(operator);
> ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
> testHarness.open();
> // add elements out-of-order
> testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
> testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
> testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
> testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
> testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
> testHarness.processWatermark(new Watermark(5500));
> expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
> expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 3999));
> expectedOutput.add(new Watermark(5500));
> // do a snapshot, close and restore again
> OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
> TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
> testHarness.close();
> testHarness = createTestHarness(operator);
> testHarness.setup();
> testHarness.initializeState(snapshot);
> testHarness.open();
> expectedOutput.clear();
> //suppose the watermark alread arrived 10000
> testHarness.processWatermark(new Watermark(10000));
> //late element with timestamp 4500 had arrived,the new session window[0, 7500] is still a valid window becase of maxtimestamp < cleantime
> //and fired immediately
> testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500));
> expectedOutput.add(new Watermark(10000));
> expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499));
> //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired again becase of a new timer had rigstered by call triggerOnMerge
> testHarness.processWatermark(new Watermark(11000));
> expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499));
> expectedOutput.add(new Watermark(11000));
> TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
> testHarness.close();
> }
> {code}
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)