You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2017/10/25 09:13:25 UTC

[GitHub] flink pull request #4900: [FLINK-7666] Close TimeService after closing opera...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/4900

    [FLINK-7666] Close TimeService after closing operators.

    
    R @aljoscha 
    
    **(The sections below can be removed for hotfixes of typos)**
    
    ## What is the purpose of the change
    
    *Breaks the `quiesceAndWait` of the `TimeService` into 2 methods, `quiesce` and `wait`, and calls them **after** closing the operators. The `quiesce()` is called while having the `checkpointLock`, while the `wait()` no.
    
    The original problem was that the StreamTask was calling the `quiesceAndAwaitPending()` of the
    `TimerService` before the close() of the operator. In the case  of the continuous file reading process, this meant that with a periodic watermark emitter and a small file (e.g. one split), the timer service would be closed before even starting to read (as soon as the reader received the first split), and no timers would be registered to emit watermarks.
    
    ## Verifying this change
    
    Added test in the `OneInputStreamTaskTest`.
    
    ## Documentation
    
      - Does this pull request introduce a new feature? NO


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink fs-reader

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4900.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4900
    
----

----


---

[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4900
  
    I see, okay, the status checking in `trigger()` together with the tight locking contract with StreamTask fixes that.
    
    My feeling is that this is a workaround to support another non-ideal design, but it should work as a temporary fix, because it is not possible to rework the file monitoring source before the next release.
    
    Good with me to merge this...


---

[GitHub] flink pull request #4900: [FLINK-7666] Close TimeService after closing opera...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4900


---

[GitHub] flink pull request #4900: [FLINK-7666] Close TimeService after closing opera...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4900#discussion_r146801477
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java ---
    @@ -73,14 +73,19 @@
     	/**
     	 * This method puts the service into a state where it does not register new timers, but
     	 * returns for each call to {@link #registerTimer(long, ProcessingTimeCallback)} only a "mock" future.
    -	 * Furthermore, the method clears all not yet started timers, and awaits the completion
    -	 * of currently executing timers.
    +	 * Furthermore, the method clears all not yet started timers.
     	 *
     	 * <p>This method can be used to cleanly shut down the timer service. The using components
     	 * will not notice that the service is shut down (as for example via exceptions when registering
     	 * a new timer), but the service will simply not fire any timer any more.
     	 */
    -	public abstract void quiesceAndAwaitPending() throws InterruptedException;
    +	public abstract void quiesce() throws InterruptedException;
    +
    +	/**
    +	 * This method can be used after calling {@link #quiesce()}, and awaits the completion
    +	 * of currently executing timers.
    +	 */
    +	public abstract void awaitPendingAfterShutdown() throws InterruptedException;
    --- End diff --
    
    nit: maybe `awaitPendingAfterQuiesce()` or only `awaitPending()`


---

[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4900
  
    In this case, the original problem will persist, right? (The one in the JIRA).
    In addition, given that:
    1) we check the status of the service before firing a timer, and 
    2) the status is set (through `quiesce()`) in the same guarded block as the close of the operator, so no timers can fire in the meantime,
    I am not sure if we could see any negative side-effects from this change.
    
    What do you think?


---

[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4900
  
    I have one quick question, but maybe I miss something big here: wouldn't it be possible to simply run `quiesceAndAwaitPending()` before `Operator::close()` and avoid the locking/flag checking and not run into exceptions? If we ensure that the task is no longer in running state, the "forgotten" timers can no longer reflect in any checkpoint/savepoint and I think there is no guarantee of a consistent output after canceling a job anyways?


---

[GitHub] flink pull request #4900: [FLINK-7666] Close TimeService after closing opera...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4900#discussion_r146802292
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---
    @@ -199,12 +214,20 @@ int getNumTasksScheduled() {
     	 */
     	private static final class TriggerTask implements Runnable {
     
    +		private final SystemProcessingTimeService timerService;
     		private final Object lock;
     		private final ProcessingTimeCallback target;
     		private final long timestamp;
     		private final AsyncExceptionHandler exceptionHandler;
     
    -		TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, ProcessingTimeCallback target, long timestamp) {
    +		private TriggerTask(
    --- End diff --
    
    We should only hand in the `status` field here, not the complete timer service because that potentially exposes to many things.


---

[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4900
  
    Perfect! Merging this...


---

[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4900
  
    I think that `quiesce()` should be called before `close()` is called on the operator, so that after `close()` no new timers can fire. That was the main purpose of the original change, because after close(), the operator may not be able to handle firing timers any more.


---

[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4900
  
    CC: @StefanRRichter This almost (but not quite) solves the problem of disposing the RocksDB backend before all timers are finished, I think.


---