You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Roman Khachatryan (Jira)" <ji...@apache.org> on 2020/05/27 20:28:00 UTC

[jira] [Commented] (FLINK-17988) Checkpointing slows down after reaching state.checkpoints.num-retained

    [ https://issues.apache.org/jira/browse/FLINK-17988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118055#comment-17118055 ] 

Roman Khachatryan commented on FLINK-17988:
-------------------------------------------

From thread stack traces follows, that fs.delete() on both filesystems causes object listing on s3:
{code:java}
 at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:895)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.listPrefix(PrestoS3FileSystem.java:484)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:315)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:450)
 at com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:427)
 at org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
 at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
 at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:86)
 at org.apache.flink.runtime.state.AbstractChannelStateHandle.discardState(AbstractChannelStateHandle.java:55)
 at org.apache.flink.runtime.state.StateUtil$$Lambda$430/787068135.accept(Unknown Source)
 at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:55)
 at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:60)
 at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:236)
 at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:132)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:263)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:218)
 at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1005)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:912)


{code}
 
{code:java}
 at org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:1255)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2223)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
 at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
 at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
 at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:86)
 at org.apache.flink.runtime.state.AbstractChannelStateHandle.discardState(AbstractChannelStateHandle.java:60)
 at org.apache.flink.runtime.state.StateUtil$$Lambda$428/480418082.accept(Unknown Source)
 at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:55)
 at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:60)
 at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:236)
 at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:132)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:263)
 at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:218)
 at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1005)
 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:912){code}

> Checkpointing slows down after reaching state.checkpoints.num-retained
> ----------------------------------------------------------------------
>
>                 Key: FLINK-17988
>                 URL: https://issues.apache.org/jira/browse/FLINK-17988
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.0
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>         Attachments: flink-conf.yaml, jobmanager.s3a.dmp, jobmanager.s3p.dmp
>
>
> With Unaligned checkpoints, happens always.
> With Aligned checkpoints - to some degree - depending on state size and thresholds: delayed by 1 minute.
>  
> Filesystems: s3p and s3a
> Parallelism: 176, repartition (num stages): 5.
> Number of files in checkpoint is about 1K (depends on state.backend.fs.memory-threshold and their size).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)