You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Grzegorz Kołakowski (JIRA)" <ji...@apache.org> on 2018/03/16 10:37:00 UTC
[jira] [Comment Edited] (BEAM-2393) BoundedSource is not
fault-tolerant in FlinkRunner Streaming mode
[ https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401721#comment-16401721 ]
Grzegorz Kołakowski edited comment on BEAM-2393 at 3/16/18 10:36 AM:
---------------------------------------------------------------------
The times are very low, usually <= 30ms, sometimes around ~150ms.
I've manually build Flink 1.4.0 with one additional log in {{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}:
{code:java}
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code}
The corresponding logs are as follows:
{noformat}
2018-03-16 09:48:36,962 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1) SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,953 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,989 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 1 on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,990 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint 1.Alignment duration: 0 ms, snapshot duration 33 ms
2018-03-16 09:48:52,012 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of checkpoint 1. Asynchronous duration: 22 ms{noformat}
In my last try, the entire checkpoint took ~16s. Please note that the thread waited on the lock for 15s.
was (Author: grzegorz_kolakowski):
The times are very low, usually <= 30ms, sometimes around ~150ms.
I've manually build Flink 1.4.0 with one additional log in {{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}:
{code:java}
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code}
The corresponding logs are as follows:
{noformat}
2018-03-16 09:48:36,962 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1) SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,953 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,989 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 1 on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,990 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint 1.Alignment duration: 0 ms, snapshot duration 33 ms
2018-03-16 09:48:52,012 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of checkpoint 1. Asynchronous duration: 22 ms{noformat}
In my last try, the entire checkpoint took ~16s. Please note that the thread waited on the lock for 15s.
> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -----------------------------------------------------------------
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Jingsong Lee
> Assignee: Grzegorz Kołakowski
> Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when the failure to restart, it will send duplicate data.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)