You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by bowenli86 <gi...@git.apache.org> on 2018/04/21 06:48:47 UTC

[GitHub] flink pull request #5887: [FLINK-6719] Add details about fault-tolerance of ...

GitHub user bowenli86 opened a pull request:

    https://github.com/apache/flink/pull/5887

    [FLINK-6719] Add details about fault-tolerance of timers to ProcessFunction docs

    ## What is the purpose of the change
    
    The fault-tolerance of timers is a frequently asked questions on the mailing lists. We should add details about the topic in the ProcessFunction docs.
    
    ## Brief change log
    
    Added details about the topic in the ProcessFunction docs.
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    ## Does this pull request potentially affect one of the following parts:
    
    none
    
    ## Documentation
    
    none

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bowenli86/flink FLINK-6719

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5887.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5887
    
----
commit 324f68d6595444ae9a84ee1ccced645c51aab471
Author: Bowen Li <bo...@...>
Date:   2018-04-21T06:46:57Z

    [FLINK-6719] Add details about fault-tolerance of timers to ProcessFunction docs

----


---

[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5887#discussion_r183708941
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -322,3 +322,26 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
     {% endhighlight %}
     </div>
     </div>
    +
    +### Fault Tolerance
    +
    +Timers registered within `ProcessFunction` are fault tolerant.
    +
    +Timers registered within `ProcessFunction` will be checkpointed by Flink. Upon restoring, timers that are checkpointed
    +from the previous job will be restored on whatever new instance is responsible for that key.
    +
    +#### Processing Time Timers
    +
    +For processing timer timers, note that the firing time of a timer is an absolute value of when to fire.
    +
    +What this means is that if a checkpointed timer’s firing processing timestamp is t (which is basically the registering
    +time + configured trigger time), then it will also fire at processing timestamp t on the new instance. Therefore, you
    --- End diff --
    
    What do you mean by `new instance`? Are you discussing the scenario when a task is recovered on a different machine? I don't think we need to mention this. It should be quite clear that clock synchronization is an issue in processing time. 
    
    The info that a pt-timer fires on restore if the time passed while the job was down is important. Also mention that this is true for savepoint, which is even more critical because more time may pass between taking and restoring from a savepoint.


---

[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5887#discussion_r183707625
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -322,3 +322,26 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
     {% endhighlight %}
     </div>
     </div>
    +
    +### Fault Tolerance
    --- End diff --
    
    Move the `###Fault Tolerance` section above the `###Optimizations` section


---

[GitHub] flink issue #5887: [FLINK-6719] [docs] Add details about fault-tolerance of ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5887
  
    Thanks for the update @bowenli86. 
    
    I'll merge the PR later.


---

[GitHub] flink issue #5887: [FLINK-6719] Add details about fault-tolerance of timers ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/5887
  
    Looks good @bowenli86 


---

[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5887#discussion_r183707389
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -271,9 +271,9 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):
     </div>
     </div>
     
    -## Optimizations
    +## Timers
     
    -### Timer Coalescing
    +### Optimizations - Timer Coalescing
     
     Every timer registered at the `TimerService` via `registerEventTimeTimer()` or
    --- End diff --
    
    Move the first paragraph under the `## Timer`` section


---

[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5887#discussion_r184960782
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -271,16 +271,39 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):
     </div>
     </div>
     
    -## Optimizations
    +## Timers
     
    -### Timer Coalescing
    +Every timer registered via `registerEventTimeTimer()` or `registerProcessingTimeTimer()` will be stored on `TimerService`
    +and enqueued for execution.
     
    -Every timer registered at the `TimerService` via `registerEventTimeTimer()` or
    -`registerProcessingTimeTimer()` will be stored on the Java heap and enqueued for execution. There is,
    -however, a maximum of one timer per key and timestamp at a millisecond resolution and thus, in the
    -worst case, every key may have a timer for each upcoming millisecond. Even if you do not do any
    -processing for outdated timers in `onTimer`, this may put a significant burden on the
    -Flink runtime.
    +Invocations of `onTimer()` and `processElement()` are always synchronized, so that users don't have to worry about
    +concurrent modification of state.
    +
    +Note that there is a maximum of one timer per key and timestamp at a millisecond resolution and thus, in the worst case,
    +every key may have a timer for each upcoming millisecond. Even if you do not do any processing for outdated timers in `onTimer`,
    +this may put a significant burden on the Flink runtime.
    +
    +### Fault Tolerance
    +
    +Timers registered within `ProcessFunction` are fault tolerant. They are synchronously checkpointed by Flink, regardless of
    +configurations of state backends. (Therefore, a large number of timers can significantly increase checkpointing time. See optimizations
    +section for advice to reduce the number of timers.)
    --- End diff --
    
    See the optimizations section for advice on how to reduce the number of timers.


---

[GitHub] flink issue #5887: [FLINK-6719] [docs] Add details about fault-tolerance of ...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/5887
  
    @fhueske updated! let me know how it looks now


---

[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5887#discussion_r183710303
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -322,3 +322,26 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
     {% endhighlight %}
     </div>
     </div>
    +
    +### Fault Tolerance
    +
    +Timers registered within `ProcessFunction` are fault tolerant.
    +
    +Timers registered within `ProcessFunction` will be checkpointed by Flink. Upon restoring, timers that are checkpointed
    +from the previous job will be restored on whatever new instance is responsible for that key.
    +
    +#### Processing Time Timers
    +
    +For processing timer timers, note that the firing time of a timer is an absolute value of when to fire.
    +
    +What this means is that if a checkpointed timer’s firing processing timestamp is t (which is basically the registering
    +time + configured trigger time), then it will also fire at processing timestamp t on the new instance. Therefore, you
    +should be aware of out-of-sync clocks between the 2 instances.
    +
    +Another thing to note is that if the restored job isn’t running at t (when the timer is supposed to fire), then on restore,
    +that timer is fired immediately.
    +
    +#### Event Time Timers
    +
    +For event time timers, given that Flink does not checkpoint watermarks, a restored event time timer will fire when the
    --- End diff --
    
    The fact that Flink doesn't checkpoint watermarks is not really related to and does not affect the behavior of timers. It is useful information but I don't think we need to mention it here.
    
    It's sufficient to mention that et-timer fire when the wm passes them.


---

[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5887#discussion_r183708069
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -322,3 +322,26 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
     {% endhighlight %}
     </div>
     </div>
    +
    +### Fault Tolerance
    +
    +Timers registered within `ProcessFunction` are fault tolerant.
    +
    +Timers registered within `ProcessFunction` will be checkpointed by Flink. Upon restoring, timers that are checkpointed
    +from the previous job will be restored on whatever new instance is responsible for that key.
    --- End diff --
    
    Add a note that timers are synchronously checkpointed (regardless of the configuration of the state backend). Hence, a large number of timers can significantly increase checkpointing time. See optimizations section for advice to reduce the number of timers.


---

[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5887#discussion_r183708604
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -322,3 +322,26 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
     {% endhighlight %}
     </div>
     </div>
    +
    +### Fault Tolerance
    +
    +Timers registered within `ProcessFunction` are fault tolerant.
    +
    +Timers registered within `ProcessFunction` will be checkpointed by Flink. Upon restoring, timers that are checkpointed
    +from the previous job will be restored on whatever new instance is responsible for that key.
    +
    +#### Processing Time Timers
    +
    +For processing timer timers, note that the firing time of a timer is an absolute value of when to fire.
    +
    +What this means is that if a checkpointed timer’s firing processing timestamp is t (which is basically the registering
    --- End diff --
    
    > (which is basically the registering time + configured trigger time)
    
    This is often the case, but not necessarily true. Esp. for processing time, the timer can also be set to something completely different. I'd remove this to avoid confusion.


---

[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5887


---

[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5887#discussion_r183710783
  
    --- Diff: docs/dev/stream/operators/process_function.md ---
    @@ -271,9 +271,9 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):
     </div>
     </div>
     
    -## Optimizations
    +## Timers
     
    -### Timer Coalescing
    +### Optimizations - Timer Coalescing
     
     Every timer registered at the `TimerService` via `registerEventTimeTimer()` or
    --- End diff --
    
    Also it would be great if you could find a good spot to add a note that calls to `processElement()` and `onTimer()` are always synchronized, i.e., users do not have to worry about concurrent modification of state.


---