You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alex Vinnik <al...@gmail.com> on 2018/08/16 19:03:24 UTC

Why checkpoint took so long

I noticed a strange thing in flink 1.3 checkpointing. Checkpoint succeeded,
but took so long 15 minutes 53 seconds. Size of checkpoint metadata on s3
is just 1.7MB. Most of the time checkpoints actually fails.

aws --profile cure s3 ls --recursive --summarize --human
s3://curation-two-admin/flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
2018-08-16 13:34:07    1.7 MiB
flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951

I came this discussion
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html#a19370.
But it looks like the problem was caused by high back pressure. Not the
case for me.

taskmanager.network.memory.max    128 MB
very small, I was hoping to get faster checkpoints with smaller buffers.
Reading from durable storage (s3) and don't worry about buffering reads due
to slow writing.

Any ideas, what can cause such slow checkpointing? Thanks. -Alex

[image: Screen Shot 2018-08-16 at 1.43.23 PM.png]

Re: Why checkpoint took so long

Posted by vino yang <ya...@gmail.com>.
Hi Alex,

This log related code is posted here:

*// message is for an unknown checkpoint, or comes too late (checkpoint
disposed)*

*if (recentPendingCheckpoints.contains(checkpointId)) {*

* wasPendingCheckpoint = true;*

* LOG.warn("Received late message for now expired checkpoint attempt {}
from " +*

* "{} of job {}.", checkpointId, message.getTaskExecutionId(),
message.getJob());*

*}*


So the attempt Id contained in your log actually represents a sub task
instance of a job.

You can find more useful information in the JM and TM logs in conjunction
with the Job id, but only if you need to adjust your log level to DEBUG,
you will see more logs related to Checkpoint when you adjust.

When you adjust, you can search the following log, combined with your
attempt id.


*log.debug("Trigger checkpoint {}@{} for {}.", checkpointId,
checkpointTimestamp, executionAttemptID);*


Thanks, vino.



Alex Vinnik <al...@gmail.com> 于2018年8月20日周一 下午11:51写道:

> Vino,
>
> Here is what I see in JM.
> 2018-08-20T15:18:09.372+0000
> [org.apache.flink.runtime.checkpoint.CheckpointCoordinator] WARN  Received
> late message for now expired checkpoint attempt 253 from
> e06b6c3a059ca809f519effba0b119ae of job 330e0d4fa4ab05198ed9afbcfcf0a1f7.
> 2018-08-20T15:18:11.430+0000
> [org.apache.flink.runtime.checkpoint.CheckpointCoordinator] WARN  Received
> late message for now expired checkpoint attempt 253 from
> 9d679ab858959ab3236e23a1f8e66484 of job 330e0d4fa4ab05198ed9afbcfcf0a1f7.
> 2018-08-20T15:18:17.890+0000
> [org.apache.flink.runtime.checkpoint.CheckpointCoordinator] WARN  Received
> late message for now expired checkpoint attempt 253 from
> 5704d6f11912bef272a23d813aab3d4f of job 330e0d4fa4ab05198ed9afbcfcf0a1f7.
>
> Searching for "checkpoint" in taskmanager logs return nothing. Anything
> specific I can search for?
>
> Thanks
> -Alex
>
>
> On Fri, Aug 17, 2018 at 7:23 AM vino yang <ya...@gmail.com> wrote:
>
>> Hi Alex,
>>
>> Have you checked if Flink has caused a timeout when accessing the file
>> system?
>>
>> Can you give JM's log and checkpoint specific log in TM.
>>
>> Thanks, vino.
>>
>> Alex Vinnik <al...@gmail.com> 于2018年8月17日周五 上午11:51写道:
>>
>>> Vino,
>>>
>>> 1. No custom implementations for source and checkpoints. Source is json
>>> files on s3.
>>>
>>> JsonLinesInputFormat format = new JsonLinesInputFormat(new Path(customerPath), configuration);
>>> format.setFilesFilter(FilePathFilter.createDefaultFilter());
>>> // Read JSON objects from the given path, monitoring it continuously for updates
>>> env
>>>    .readFile(format, customerPath, FileProcessingMode.PROCESS_CONTINUOUSLY, pollInterval.toMillis())
>>>
>>>
>>> RocksDB is used as sate backend.
>>>
>>> 2. Majority of checkpoints timeout after 15 minutes.
>>>
>>> Thanks
>>>
>>> On Thu, Aug 16, 2018 at 8:48 PM vino yang <ya...@gmail.com> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> I still have a few questions:
>>>>
>>>> 1) Is this file source and checkpoint logic implemented by you? .
>>>> 2) Other failed checkpoints, can you give the corresponding failure log
>>>> or more descriptions, such as failure due to timeout, or other reasons?
>>>>
>>>> Thanks, vino.
>>>>
>>>> Alex Vinnik <al...@gmail.com> 于2018年8月17日周五 上午3:03写道:
>>>>
>>>>> I noticed a strange thing in flink 1.3 checkpointing. Checkpoint
>>>>> succeeded, but took so long 15 minutes 53 seconds. Size of checkpoint
>>>>> metadata on s3 is just 1.7MB. Most of the time checkpoints actually fails.
>>>>>
>>>>> aws --profile cure s3 ls --recursive --summarize --human
>>>>> s3://curation-two-admin/flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
>>>>> 2018-08-16 13:34:07    1.7 MiB
>>>>> flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
>>>>>
>>>>> I came this discussion
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html#a19370.
>>>>> But it looks like the problem was caused by high back pressure. Not the
>>>>> case for me.
>>>>>
>>>>> taskmanager.network.memory.max    128 MB
>>>>> very small, I was hoping to get faster checkpoints with smaller
>>>>> buffers. Reading from durable storage (s3) and don't worry about buffering
>>>>> reads due to slow writing.
>>>>>
>>>>> Any ideas, what can cause such slow checkpointing? Thanks. -Alex
>>>>>
>>>>> [image: Screen Shot 2018-08-16 at 1.43.23 PM.png]
>>>>>
>>>>

Re: Why checkpoint took so long

Posted by Alex Vinnik <al...@gmail.com>.
Vino,

Here is what I see in JM.
2018-08-20T15:18:09.372+0000
[org.apache.flink.runtime.checkpoint.CheckpointCoordinator] WARN  Received
late message for now expired checkpoint attempt 253 from
e06b6c3a059ca809f519effba0b119ae of job 330e0d4fa4ab05198ed9afbcfcf0a1f7.
2018-08-20T15:18:11.430+0000
[org.apache.flink.runtime.checkpoint.CheckpointCoordinator] WARN  Received
late message for now expired checkpoint attempt 253 from
9d679ab858959ab3236e23a1f8e66484 of job 330e0d4fa4ab05198ed9afbcfcf0a1f7.
2018-08-20T15:18:17.890+0000
[org.apache.flink.runtime.checkpoint.CheckpointCoordinator] WARN  Received
late message for now expired checkpoint attempt 253 from
5704d6f11912bef272a23d813aab3d4f of job 330e0d4fa4ab05198ed9afbcfcf0a1f7.

Searching for "checkpoint" in taskmanager logs return nothing. Anything
specific I can search for?

Thanks
-Alex


On Fri, Aug 17, 2018 at 7:23 AM vino yang <ya...@gmail.com> wrote:

> Hi Alex,
>
> Have you checked if Flink has caused a timeout when accessing the file
> system?
>
> Can you give JM's log and checkpoint specific log in TM.
>
> Thanks, vino.
>
> Alex Vinnik <al...@gmail.com> 于2018年8月17日周五 上午11:51写道:
>
>> Vino,
>>
>> 1. No custom implementations for source and checkpoints. Source is json
>> files on s3.
>>
>> JsonLinesInputFormat format = new JsonLinesInputFormat(new Path(customerPath), configuration);
>> format.setFilesFilter(FilePathFilter.createDefaultFilter());
>> // Read JSON objects from the given path, monitoring it continuously for updates
>> env
>>    .readFile(format, customerPath, FileProcessingMode.PROCESS_CONTINUOUSLY, pollInterval.toMillis())
>>
>>
>> RocksDB is used as sate backend.
>>
>> 2. Majority of checkpoints timeout after 15 minutes.
>>
>> Thanks
>>
>> On Thu, Aug 16, 2018 at 8:48 PM vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Alex,
>>>
>>> I still have a few questions:
>>>
>>> 1) Is this file source and checkpoint logic implemented by you? .
>>> 2) Other failed checkpoints, can you give the corresponding failure log
>>> or more descriptions, such as failure due to timeout, or other reasons?
>>>
>>> Thanks, vino.
>>>
>>> Alex Vinnik <al...@gmail.com> 于2018年8月17日周五 上午3:03写道:
>>>
>>>> I noticed a strange thing in flink 1.3 checkpointing. Checkpoint
>>>> succeeded, but took so long 15 minutes 53 seconds. Size of checkpoint
>>>> metadata on s3 is just 1.7MB. Most of the time checkpoints actually fails.
>>>>
>>>> aws --profile cure s3 ls --recursive --summarize --human
>>>> s3://curation-two-admin/flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
>>>> 2018-08-16 13:34:07    1.7 MiB
>>>> flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
>>>>
>>>> I came this discussion
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html#a19370.
>>>> But it looks like the problem was caused by high back pressure. Not the
>>>> case for me.
>>>>
>>>> taskmanager.network.memory.max    128 MB
>>>> very small, I was hoping to get faster checkpoints with smaller
>>>> buffers. Reading from durable storage (s3) and don't worry about buffering
>>>> reads due to slow writing.
>>>>
>>>> Any ideas, what can cause such slow checkpointing? Thanks. -Alex
>>>>
>>>> [image: Screen Shot 2018-08-16 at 1.43.23 PM.png]
>>>>
>>>

Re: Why checkpoint took so long

Posted by vino yang <ya...@gmail.com>.
Hi Alex,

Have you checked if Flink has caused a timeout when accessing the file
system?

Can you give JM's log and checkpoint specific log in TM.

Thanks, vino.

Alex Vinnik <al...@gmail.com> 于2018年8月17日周五 上午11:51写道:

> Vino,
>
> 1. No custom implementations for source and checkpoints. Source is json
> files on s3.
>
> JsonLinesInputFormat format = new JsonLinesInputFormat(new Path(customerPath), configuration);
> format.setFilesFilter(FilePathFilter.createDefaultFilter());
> // Read JSON objects from the given path, monitoring it continuously for updates
> env
>    .readFile(format, customerPath, FileProcessingMode.PROCESS_CONTINUOUSLY, pollInterval.toMillis())
>
>
> RocksDB is used as sate backend.
>
> 2. Majority of checkpoints timeout after 15 minutes.
>
> Thanks
>
> On Thu, Aug 16, 2018 at 8:48 PM vino yang <ya...@gmail.com> wrote:
>
>> Hi Alex,
>>
>> I still have a few questions:
>>
>> 1) Is this file source and checkpoint logic implemented by you? .
>> 2) Other failed checkpoints, can you give the corresponding failure log
>> or more descriptions, such as failure due to timeout, or other reasons?
>>
>> Thanks, vino.
>>
>> Alex Vinnik <al...@gmail.com> 于2018年8月17日周五 上午3:03写道:
>>
>>> I noticed a strange thing in flink 1.3 checkpointing. Checkpoint
>>> succeeded, but took so long 15 minutes 53 seconds. Size of checkpoint
>>> metadata on s3 is just 1.7MB. Most of the time checkpoints actually fails.
>>>
>>> aws --profile cure s3 ls --recursive --summarize --human
>>> s3://curation-two-admin/flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
>>> 2018-08-16 13:34:07    1.7 MiB
>>> flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
>>>
>>> I came this discussion
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html#a19370.
>>> But it looks like the problem was caused by high back pressure. Not the
>>> case for me.
>>>
>>> taskmanager.network.memory.max    128 MB
>>> very small, I was hoping to get faster checkpoints with smaller buffers.
>>> Reading from durable storage (s3) and don't worry about buffering reads due
>>> to slow writing.
>>>
>>> Any ideas, what can cause such slow checkpointing? Thanks. -Alex
>>>
>>> [image: Screen Shot 2018-08-16 at 1.43.23 PM.png]
>>>
>>

Re: Why checkpoint took so long

Posted by Alex Vinnik <al...@gmail.com>.
Vino,

1. No custom implementations for source and checkpoints. Source is json
files on s3.

JsonLinesInputFormat format = new JsonLinesInputFormat(new
Path(customerPath), configuration);
format.setFilesFilter(FilePathFilter.createDefaultFilter());
// Read JSON objects from the given path, monitoring it continuously for updates
env
   .readFile(format, customerPath,
FileProcessingMode.PROCESS_CONTINUOUSLY, pollInterval.toMillis())


RocksDB is used as sate backend.

2. Majority of checkpoints timeout after 15 minutes.

Thanks

On Thu, Aug 16, 2018 at 8:48 PM vino yang <ya...@gmail.com> wrote:

> Hi Alex,
>
> I still have a few questions:
>
> 1) Is this file source and checkpoint logic implemented by you? .
> 2) Other failed checkpoints, can you give the corresponding failure log or
> more descriptions, such as failure due to timeout, or other reasons?
>
> Thanks, vino.
>
> Alex Vinnik <al...@gmail.com> 于2018年8月17日周五 上午3:03写道:
>
>> I noticed a strange thing in flink 1.3 checkpointing. Checkpoint
>> succeeded, but took so long 15 minutes 53 seconds. Size of checkpoint
>> metadata on s3 is just 1.7MB. Most of the time checkpoints actually fails.
>>
>> aws --profile cure s3 ls --recursive --summarize --human
>> s3://curation-two-admin/flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
>> 2018-08-16 13:34:07    1.7 MiB
>> flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
>>
>> I came this discussion
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html#a19370.
>> But it looks like the problem was caused by high back pressure. Not the
>> case for me.
>>
>> taskmanager.network.memory.max    128 MB
>> very small, I was hoping to get faster checkpoints with smaller buffers.
>> Reading from durable storage (s3) and don't worry about buffering reads due
>> to slow writing.
>>
>> Any ideas, what can cause such slow checkpointing? Thanks. -Alex
>>
>> [image: Screen Shot 2018-08-16 at 1.43.23 PM.png]
>>
>

Re: Why checkpoint took so long

Posted by vino yang <ya...@gmail.com>.
Hi Alex,

I still have a few questions:

1) Is this file source and checkpoint logic implemented by you? .
2) Other failed checkpoints, can you give the corresponding failure log or
more descriptions, such as failure due to timeout, or other reasons?

Thanks, vino.

Alex Vinnik <al...@gmail.com> 于2018年8月17日周五 上午3:03写道:

> I noticed a strange thing in flink 1.3 checkpointing. Checkpoint
> succeeded, but took so long 15 minutes 53 seconds. Size of checkpoint
> metadata on s3 is just 1.7MB. Most of the time checkpoints actually fails.
>
> aws --profile cure s3 ls --recursive --summarize --human
> s3://curation-two-admin/flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
> 2018-08-16 13:34:07    1.7 MiB
> flink/sa-checkpoint/sa1/checkpoint_metadata-c99cfda10951
>
> I came this discussion
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html#a19370.
> But it looks like the problem was caused by high back pressure. Not the
> case for me.
>
> taskmanager.network.memory.max    128 MB
> very small, I was hoping to get faster checkpoints with smaller buffers.
> Reading from durable storage (s3) and don't worry about buffering reads due
> to slow writing.
>
> Any ideas, what can cause such slow checkpointing? Thanks. -Alex
>
> [image: Screen Shot 2018-08-16 at 1.43.23 PM.png]
>