You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2016/06/11 21:52:09 UTC

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/2093

    [FLINK-3714] Add Support for "Allowed Lateness"

    Allows the user to specify an "allowed lateness" for  elements, before they are dropped as late arrivals.
    In addition, it defines a "clean up" time for the window state, which is the end of the window plus the 
    aforementioned allowed lateness. When the watermark or the processing time (depending on the widow
    assigner) passes this mark, then the window state is purged/cleaned.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink lateness_n_gc

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2093.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2093
    
----
commit be294b6dde8c830bde6d6944c403187d366f1c80
Author: kl0u <kk...@gmail.com>
Date:   2016-05-31T15:13:58Z

    [FLINK-3714] Add Support for "Allowed Lateness"
    
    Handle late elements and take care
    of cleaning the window state.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r66787046
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---
    @@ -121,6 +124,23 @@ public WindowedStream(KeyedStream<T, K> input,
     	}
     
     	/**
    +	 * Sets the allowed lateness. If the {@link WindowAssigner} used
    +	 * is in processing time, then the allowed lateness is set to 0.
    +	 */
    +	@PublicEvolving
    +	public WindowedStream<T, K, W> setAllowedLateness(Time lateness) {
    +		long millis = lateness.toMilliseconds();
    +		if (allowedLateness < 0) {
    +			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
    +		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
    +			this.allowedLateness = 0;
    --- End diff --
    
    We probably should log a warning here and not silently set to zero.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r66789581
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---
    @@ -121,6 +124,23 @@ public WindowedStream(KeyedStream<T, K> input,
     	}
     
     	/**
    +	 * Sets the allowed lateness. If the {@link WindowAssigner} used
    +	 * is in processing time, then the allowed lateness is set to 0.
    +	 */
    +	@PublicEvolving
    +	public WindowedStream<T, K, W> setAllowedLateness(Time lateness) {
    +		long millis = lateness.toMilliseconds();
    +		if (allowedLateness < 0) {
    +			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
    +		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
    +			this.allowedLateness = 0;
    --- End diff --
    
    Yeah, but it would throw right when specifying the job. Not when running or anything. LOG is also fine, though. \U0001f603 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r67155650
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---
    @@ -124,16 +124,17 @@ public WindowedStream(KeyedStream<T, K> input,
     	}
     
     	/**
    -	 * Sets the allowed lateness. If the {@link WindowAssigner} used
    +	 * Sets the allowed lateness. By default this is 0. If the {@link WindowAssigner} used
    --- End diff --
    
    The javadoc is out of date with the new exception-throwing behavior. For the text of the exception I would prefer something like "Setting the allowed lateness is only valid for event-time windows.". Precise and simple. \U0001f603 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2093
  
    I had some inline comments.
    
    One other thing I would like to see changed is moving the logic that checks whether to do cleanup from `cleanup()` to the call-site. So in `processElement()` it would be:
    ```
    if (combinedTriggerResult.isPurge) {
      cleanup(...);
    } else {
      registerCleanupTimer(...);
    }
    ```
    The reason for this is that the check in `cleanup()` for whether a timer is a cleanup timer does not take into account the event-time/processing-time split: if a processing-time timer fires on exactly the cleanup time of an event-time window this would also trigger cleanup.  The code in `processTriggersFor` (which, by the way can be moved back into `processWatermark()` now that you refactored this special event-time trigger hack) would have this check:
    ```
    if (assigner.isEventTime() && isCleanupTimer(...))
    ```
    while the code in `trigger()` would be
    ```
    if (!assigner.isEventTime() && isCleanupTimer(...))
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r66787298
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java ---
    @@ -36,12 +36,17 @@ private EventTimeTrigger() {}
     	@Override
     	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
     		ctx.registerEventTimeTimer(window.maxTimestamp());
    -		return TriggerResult.CONTINUE;
    +
    +		return (timestamp < ctx.getCurrentWatermark()) ?
    --- End diff --
    
    This should be `window.maxTimestamp() <= ctx.getCurrentWatermark()`. If the tests pass with this, then they also need to be adapted to fail with the current code and then work with the new code. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r66787353
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---
    @@ -121,6 +124,23 @@ public WindowedStream(KeyedStream<T, K> input,
     	}
     
     	/**
    +	 * Sets the allowed lateness. If the {@link WindowAssigner} used
    +	 * is in processing time, then the allowed lateness is set to 0.
    +	 */
    +	@PublicEvolving
    +	public WindowedStream<T, K, W> setAllowedLateness(Time lateness) {
    +		long millis = lateness.toMilliseconds();
    +		if (allowedLateness < 0) {
    +			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
    +		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
    +			this.allowedLateness = 0;
    --- End diff --
    
     I agree but here we do not have a logger here yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r66789406
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---
    @@ -121,6 +124,23 @@ public WindowedStream(KeyedStream<T, K> input,
     	}
     
     	/**
    +	 * Sets the allowed lateness. If the {@link WindowAssigner} used
    +	 * is in processing time, then the allowed lateness is set to 0.
    +	 */
    +	@PublicEvolving
    +	public WindowedStream<T, K, W> setAllowedLateness(Time lateness) {
    +		long millis = lateness.toMilliseconds();
    +		if (allowedLateness < 0) {
    +			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
    +		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
    +			this.allowedLateness = 0;
    --- End diff --
    
    LOG is more natural. There is no reason to stop the job for the user (as it is not an error). We should just tell him that 
    it does not make that much sense, so that he knows how to interpret the results he gets. Throwing an exception is somehow more drastic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r67154730
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---
    @@ -131,8 +131,10 @@
     	protected final TypeSerializer<W> windowSerializer;
     
     	/**
    -	 * The allowed lateness for elements.
    -	 * By default this is set to {@code 0}.
    +	 * The allowed lateness for elements. This is used for:
    +	 * <li> Deciding if an element should be dropped from a window due to lateness. </li>
    --- End diff --
    
    I think you need an `<ul>` around this whole list. And the closing `</li>` is not used in Javadoc, I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2093
  
    Thanks for merging it @aljoscha ! I am closing this PR and the issue related to it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r66788457
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---
    @@ -121,6 +124,23 @@ public WindowedStream(KeyedStream<T, K> input,
     	}
     
     	/**
    +	 * Sets the allowed lateness. If the {@link WindowAssigner} used
    +	 * is in processing time, then the allowed lateness is set to 0.
    +	 */
    +	@PublicEvolving
    +	public WindowedStream<T, K, W> setAllowedLateness(Time lateness) {
    +		long millis = lateness.toMilliseconds();
    +		if (allowedLateness < 0) {
    +			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
    +		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
    +			this.allowedLateness = 0;
    --- End diff --
    
    Then we can add a `LOG`, or maybe we should even throw an exception. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r67339362
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---
    @@ -917,6 +929,465 @@ public void testRestoreAndSnapshotAreInSync() throws Exception {
     		Assert.assertEquals(operator.processingTimeTimerTimestamps, otherOperator.processingTimeTimerTimestamps);
     	}
     
    +	@Test
    +	public void testLateness() throws Exception {
    +		final int WINDOW_SIZE = 2;
    +		final long LATENESS = 500;
    +
    +		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
    +
    +		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
    +			new SumReducer(),
    +			inputType.createSerializer(new ExecutionConfig()));
    +
    +		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
    +			new WindowOperator<>(
    +				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
    +				new TimeWindow.Serializer(),
    +				new TupleKeySelector(),
    +				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
    +				stateDesc,
    +				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
    +				EventTimeTrigger.create(),
    +				LATENESS);
    +
    +		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(operator);
    +
    +		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
    +
    +		operator.setInputType(inputType, new ExecutionConfig());
    +		testHarness.open();
    +
    +		long initialTime = 0L;
    +		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 500));
    +		testHarness.processWatermark(new Watermark(initialTime + 1500));
    +
    +		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1300));
    +
    +		testHarness.processWatermark(new Watermark(initialTime + 2300));
    +
    +		// this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark
    +		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1997));
    +		testHarness.processWatermark(new Watermark(initialTime + 6000));
    +
    +		// this will be dropped because window.maxTimestamp() + allowedLateness < currentWatermark
    +		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
    +		testHarness.processWatermark(new Watermark(initialTime + 7000));
    +
    +		Tuple2<String, Integer> el1 = new Tuple2<>("key2", 2);
    +		// the following is 1 and not  3because the trigger fires and purges.
    +		Tuple2<String, Integer> el2 = new Tuple2<>("key2", 1);
    +
    +		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
    +
    +		expected.add(new Watermark(initialTime + 1500));
    +		expected.add(new StreamRecord<>(el1, initialTime + 1999));
    +
    +		expected.add(new Watermark(initialTime + 2300));
    +		expected.add(new StreamRecord<>(el2, initialTime + 1999));
    +
    +		expected.add(new Watermark(initialTime + 6000));
    +		expected.add(new Watermark(initialTime + 7000));
    +
    +		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testDropDueToLatenessTumbling() throws Exception {
    --- End diff --
    
    No element is dropped in this test. If I'm not mistaken.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u closed the pull request at:

    https://github.com/apache/flink/pull/2093


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r66794085
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---
    @@ -551,19 +657,28 @@ public void registerProcessingTimeTimer(long time) {
     			}
     		}
     
    +		void registerCleanupTimer(long time) {
    --- End diff --
    
    The logic for these two (`register/deleteCleanupTimer`) should be moved directly into `WindowOperator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2093
  
    The changes look very good now! The refactoring of the cleanup and firing makes it very easy to see what's going on. I just had some inline comments about outdated Javadoc and exception strings.
    
    I still have to read the tests in depth, I'm doing that now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2093
  
    Could you please update the tests to add elements to `expectedOutput` in line with the `processElement` and `processWatermark` calls on the test harness. In the existing tests I did this and it makes it easy to reason about what output should be produced in sequence. If the expected elements/watermark are added in after the other calls you have to keep jumping back and forth when reading the test. Also, could you replace the `el`, `el2` and so on variables with inline tuple creation. For the same reason of not having to jump back and forth when reading the tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r67313976
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---
    @@ -132,9 +132,9 @@
     
     	/**
     	 * The allowed lateness for elements. This is used for:
    -	 * <li> Deciding if an element should be dropped from a window due to lateness. </li>
    -	 * <li> Clearing the state of a window if the system time passes
    -	 * the {@code window.maxTimestamp + allowedLateness} landmark. </li>
    +	 * <ul> Deciding if an element should be dropped from a window due to lateness.</ul>
    --- End diff --
    
    Ah, I meant something like this:
    ```
    <ul>
      <li> item 1
      <li> item2
    </ul>
    
    the javadoc syntax is almost similar to HTML, just the closing `</li>` tags are missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r67156546
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---
    @@ -450,17 +436,140 @@ public final void trigger(long time) throws Exception {
     				context.key = timer.key;
     				context.window = timer.window;
     				setKeyContext(timer.key);
    +
    +				AppendingState<IN, ACC> windowState;
    +				MergingWindowSet<W> mergingWindows = null;
    +
    +				if (windowAssigner instanceof MergingWindowAssigner) {
    +					mergingWindows = getMergingWindowSet();
    +					W stateWindow = mergingWindows.getStateWindow(context.window);
    +					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
    +				} else {
    +					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
    +				}
    +
     				TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
    -				processTriggerResult(triggerResult, context.window);
    +				fireOrContinue(triggerResult, context.window, windowState);
    +
    +				if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
    +					cleanup(timer.window, windowState, mergingWindows);
    +				}
    +
     			} else {
     				fire = false;
     			}
     		} while (fire);
    +	}
    +
    +	/**
    +	 * Cleans up the window state if the provided {@link TriggerResult} requires so, or if it
    +	 * is time to do so (see {@link #isCleanupTime(Window, long)}). The caller must ensure that the
    +	 * correct key is set in the state backend and the context object.
    +	 */
    +	private void cleanup(W window,
    +						AppendingState<IN, ACC> windowState,
    +						MergingWindowSet<W> mergingWindows) throws Exception {
    +		windowState.clear();
    +		if (mergingWindows != null) {
    +			mergingWindows.retireWindow(window);
    +		}
    +		context.clear();
    +		deleteCleanupTimer(window);
    +	}
    +
    +	/**
    +	 * Triggers the window computation if the provided {@link TriggerResult} requires so.
    +	 * The caller must ensure that the correct key is set in the state backend and the context object.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	private void fireOrContinue(TriggerResult triggerResult,
    +								W window,
    +								AppendingState<IN, ACC> windowState) throws Exception {
    +		if (!triggerResult.isFire()) {
    +			return;
    +		}
    +
    +		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
    +		ACC contents = windowState.get();
    +		userFunction.apply(context.key, context.window, contents, timestampedCollector);
    +	}
    +
    +	/**
    +	 * Retrieves the {@link MergingWindowSet} for the currently active key.
    +	 * The caller must ensure that the correct key is set in the state backend.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
    +		MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
    +		if (mergingWindows == null) {
    +			// try to retrieve from state
     
    -		// Also check any watermark timers. We might have some in here since
    -		// Context.registerEventTimeTimer sets a trigger if an event-time trigger is registered
    -		// that is already behind the watermark.
    -		processTriggersFor(new Watermark(currentWatermark));
    +			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
    +			ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
    +			ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
    +
    +			mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
    +			mergeState.clear();
    +
    +			mergingWindowsByKey.put((K) getStateBackend().getCurrentKey(), mergingWindows);
    +		}
    +		return mergingWindows;
    +	}
    +
    +	/**
    +	 * This method decides if a window is currently active, or not, based on the current
    --- End diff --
    
    I would prefer this to start with "Decides if a window is late nor not, based on ...". It is not clear what `active` means here and `late` describes pretty well what the method does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r66787877
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---
    @@ -264,7 +274,9 @@ public void dispose() {
     	@Override
     	@SuppressWarnings("unchecked")
     	public void processElement(StreamRecord<IN> element) throws Exception {
    -		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
    +
    +		Collection<W> elementWindows = windowAssigner.
    --- End diff --
    
    Here I'm just being nitpicky but the code looks strange if broken into lines like this. To me
    ```
    Collection<W> elementWindows =
        windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
    ```
    or
    ```
    Collection<W> elementWindows = windowAssigner.assignWindows(
        element.getValue(),
        element.getTimestamp()); 
    ```
    
    seem more elegant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r66786965
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---
    @@ -121,6 +124,23 @@ public WindowedStream(KeyedStream<T, K> input,
     	}
     
     	/**
    +	 * Sets the allowed lateness. If the {@link WindowAssigner} used
    +	 * is in processing time, then the allowed lateness is set to 0.
    +	 */
    +	@PublicEvolving
    +	public WindowedStream<T, K, W> setAllowedLateness(Time lateness) {
    --- End diff --
    
    This should follow the same pattern as the other methods (`trigger()`, `evictor()`...), i.e. `allowedLateness(Time)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2093
  
    Some tests, such as `testLateness()` still fill the expected output in one block at the end of the test and not inline with the `processElement()`/`processWatermark()` calls. Some, such as `testDropDueToLatenessSliding` still have the variables `el1`, `el2` and so on that make it hard to parse what is going on.
    
    Please also fix those tests. The session tests look very good now!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2093
  
    For the tests, I think you need to compare the expected results to the actual results the same way the existing tests do it, via exact comparison. As you put it now, it would allow that all actual results are the same value because the tests don't verify that we see all expected values, just that the size of the result list matches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2093
  
    Can you please also add a short javadoc on `EventTimeTriggerAccum`, i.e. what it does and why we need a special trigger for the tests. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2093#discussion_r66787496
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---
    @@ -129,6 +130,12 @@
     	 */
     	protected final TypeSerializer<W> windowSerializer;
     
    +	/**
    +	 * The allowed lateness for elements.
    --- End diff --
    
    This needs to be updated. The allowed lateness is used both for dropping of late elements (based on the window into which they fall) and garbage collection of window state). The default is not set in this operator but in `WindowedStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---