You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Roman Khachatryan (Jira)" <ji...@apache.org> on 2020/05/27 21:00:06 UTC

[jira] [Comment Edited] (FLINK-17988) Checkpointing slows down after reaching state.checkpoints.num-retained

    [ https://issues.apache.org/jira/browse/FLINK-17988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118055#comment-17118055 ] 

Roman Khachatryan edited comment on FLINK-17988 at 5/27/20, 8:59 PM:
---------------------------------------------------------------------

From thread stack traces follows, that fs.delete() on both filesystems causes object listing on s3:
{code:java}
 at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:895)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.listPrefix(PrestoS3FileSystem.java:484)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:315)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:450)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:427)
 at org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
 at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
 at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:86)
 at org.apache.flink.runtime.state.AbstractChannelStateHandle.discardState(AbstractChannelStateHandle.java:55)
 at org.apache.flink.runtime.state.StateUtil$$Lambda$430/787068135.accept(Unknown Source)
 at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:55)
 at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:60)
 at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:236)
 at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:132)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:263)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:218)
 at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1005)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:912)


{code}
 
{code:java}
 at org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:1255)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2223)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
 at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
 at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
 at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:86)
 at org.apache.flink.runtime.state.AbstractChannelStateHandle.discardState(AbstractChannelStateHandle.java:60)
 at org.apache.flink.runtime.state.StateUtil$$Lambda$428/480418082.accept(Unknown Source)
 at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:55)
 at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:60)
 at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:236)
 at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:132)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:263)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:218)
 at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1005)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:912){code}
Unclear why aligned mode is less affected (it lists objects too).

Triggering of new requests is blocked on monitor which is held by discardOnSubsume. This seems reasonable (otherwise checkpoints to discard would pile up; and triggered checkpoint wouldn't be able to start anyways).


was (Author: roman_khachatryan):
From thread stack traces follows, that fs.delete() on both filesystems causes object listing on s3:
{code:java}
 at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:895)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.listPrefix(PrestoS3FileSystem.java:484)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:315)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:450)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:427)
 at org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
 at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
 at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:86)
 at org.apache.flink.runtime.state.AbstractChannelStateHandle.discardState(AbstractChannelStateHandle.java:55)
 at org.apache.flink.runtime.state.StateUtil$$Lambda$430/787068135.accept(Unknown Source)
 at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:55)
 at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:60)
 at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:236)
 at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:132)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:263)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:218)
 at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1005)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:912)


{code}
 
{code:java}
 at org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:1255)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2223)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
 at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
 at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
 at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:86)
 at org.apache.flink.runtime.state.AbstractChannelStateHandle.discardState(AbstractChannelStateHandle.java:60)
 at org.apache.flink.runtime.state.StateUtil$$Lambda$428/480418082.accept(Unknown Source)
 at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:55)
 at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:60)
 at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:236)
 at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:132)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:263)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:218)
 at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1005)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:912){code}

> Checkpointing slows down after reaching state.checkpoints.num-retained
> ----------------------------------------------------------------------
>
>                 Key: FLINK-17988
>                 URL: https://issues.apache.org/jira/browse/FLINK-17988
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.0
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>         Attachments: flink-conf.yaml, jobmanager.s3a.dmp, jobmanager.s3p.dmp
>
>
> With Unaligned checkpoints, happens always (new checkpoint is never started or triggered).
> With Aligned checkpoints - to some degree - depending on state size and thresholds: delayed by 1 minute. Delay grows very slowly with state size.
>  
> Filesystems: s3p and s3a
> Parallelism: 176, repartition (num stages): 5.
> Number of files in checkpoint is about 1K (depends on state.backend.fs.memory-threshold and their size). Size doesn't matter (100K..10G).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)