You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yuemeng (JIRA)" <ji...@apache.org> on 2018/04/26 09:41:00 UTC

[jira] [Comment Edited] (FLINK-9201) same merge window will be fired twice if watermark already passed the merge window

    [ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453759#comment-16453759 ] 

yuemeng edited comment on FLINK-9201 at 4/26/18 9:40 AM:
---------------------------------------------------------

hi [~till.rohrmann]

can you help me to check this issue?

thanks a lot


was (Author: yuemeng):
hi [~till.rohrmann]

can you check this issue?

> same merge window will be fired twice if watermark already passed the merge window
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-9201
>                 URL: https://issues.apache.org/jira/browse/FLINK-9201
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.3.3
>            Reporter: yuemeng
>            Assignee: yuemeng
>            Priority: Blocker
>
> sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge window w3.
> Examples
> {code:java}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
>  closeCalled.set(0);
>  final int sessionSize = 3;
>  TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
>  ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
>  inputType.createSerializer(new ExecutionConfig()));
>  WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
>  EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
>  new TimeWindow.Serializer(),
>  new TupleKeySelector(),
>  BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
>  stateDesc,
>  new InternalIterableWindowFunction<>(new SessionWindowFunction()),
>  EventTimeTrigger.create(),
>  60000,
>  null /* late data output tag */);
>  OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
>  createTestHarness(operator);
>  ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
>  testHarness.open();
>  // add elements out-of-order
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
>  testHarness.processWatermark(new Watermark(5500));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 3999));
>  expectedOutput.add(new Watermark(5500));
>  // do a snapshot, close and restore again
>  OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
>  testHarness = createTestHarness(operator);
>  testHarness.setup();
>  testHarness.initializeState(snapshot);
>  testHarness.open();
>  expectedOutput.clear();
>  //suppose the watermark alread arrived 10000
>  testHarness.processWatermark(new Watermark(10000));
>  //late element with timestamp 4500 had arrived,the new session window[0, 7500] is still a valid window becase of maxtimestamp < cleantime
>  //and fired immediately
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500));
>  expectedOutput.add(new Watermark(10000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499));
>  //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired again becase of a new timer had rigstered by call triggerOnMerge
>  testHarness.processWatermark(new Watermark(11000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499));
>  expectedOutput.add(new Watermark(11000));
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)