You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Grzegorz Kołakowski (JIRA)" <ji...@apache.org> on 2018/03/16 10:37:00 UTC

[jira] [Comment Edited] (BEAM-2393) BoundedSource is not fault-tolerant in FlinkRunner Streaming mode

    [ https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401721#comment-16401721 ] 

Grzegorz Kołakowski edited comment on BEAM-2393 at 3/16/18 10:36 AM:
---------------------------------------------------------------------

The times are very low, usually <= 30ms, sometimes around ~150ms.

I've manually build Flink 1.4.0 with one additional log in {{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}:
{code:java}
private boolean performCheckpoint(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics) throws Exception {

   LOG.debug("Starting checkpoint ({}) {} on task {}",
      checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

   synchronized (lock) {
      LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code}
The corresponding logs are as follows:
{noformat}
2018-03-16 09:48:36,962 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1) SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,953 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,989 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 1 on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,990 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint 1.Alignment duration: 0 ms, snapshot duration 33 ms
2018-03-16 09:48:52,012 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of checkpoint 1. Asynchronous duration: 22 ms{noformat}
In my last try, the entire checkpoint took ~16s. Please note that the thread waited on the lock for 15s.


was (Author: grzegorz_kolakowski):
The times are very low, usually <= 30ms, sometimes around ~150ms.

I've manually build Flink 1.4.0 with one additional log in {{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}:
{code:java}
private boolean performCheckpoint(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics) throws Exception {

   LOG.debug("Starting checkpoint ({}) {} on task {}",
      checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

   synchronized (lock) {
      LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code}
 

The corresponding logs are as follows:

 
{noformat}
2018-03-16 09:48:36,962 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1) SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,953 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,989 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 1 on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,990 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint 1.Alignment duration: 0 ms, snapshot duration 33 ms
2018-03-16 09:48:52,012 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of checkpoint 1. Asynchronous duration: 22 ms{noformat}
 

In my last try, the entire checkpoint took ~16s. Please note that the thread waited on the lock for 15s.

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -----------------------------------------------------------------
>
>                 Key: BEAM-2393
>                 URL: https://issues.apache.org/jira/browse/BEAM-2393
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Jingsong Lee
>            Assignee: Grzegorz Kołakowski
>            Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)