You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijayendra Yadav <co...@gmail.com> on 2022/03/16 21:01:52 UTC

Flink Kinesis checkpoint failures long running service

Hi Flink Team,

I am using Flink 1.11 with kinesis consumer and s3 file streaming write
with s3 checkpoint backend (*FsStateBackend*). This is a streaming service.

Usually a couple of checkpoints fails which is okay,  But after a week or
so of process running, checkpoint failures becomes ir·re·cov·er·a·ble and
although the application keeps running but in bad state and data flow
blocks.

I don't see timeout issue because expiry happens long before checkpint
timeout is hit.

Refer Graph below:
[image: image.png]

Flink Checkpoint configurations as below:
Note: Time units in Milliseconds

flink.checkpoint.interval=10000
flink.checkpoint.minPauseInterval=500
flink.checkpoint.Timeout=10000
flink.checkpoint.maxConcurrent=1
flink.checkpoint.preferCheckPoint=true


kinesis.shard.getrecords.max=10000
kinesis.shard.getrecords.interval=10000
kinesis.initial.position=LATEST

Parallelism=16

*EXCEPTION On Job:*
2022-03-16 03:05:58,246 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Checkpoint 536036 of job 5c658fc5f325f40baf063a78a20b1bb2 *expired
before completing*.
2022-03-16 03:05:58,248 INFO
org.apache.flink.runtime.jobmaster.JobMaster                  - Trying
to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: *Exceeded checkpoint
tolerable failure threshold*.

    2022-03-16 03:03:22,641 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
- Shutting down the shard consumer threads of subtask 13 ...
2022-03-16 03:03:22,641 WARN
*org.apache.hadoop.fs.s3a.S3ABlockOutputStream
- Interrupted object upload*
java.lang.InterruptedException
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at
org.apache.hadoop.fs.s3a.S3ABlockOutputStream.putObject(S3ABlockOutputStream.java:441)
at
org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:360)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
at
org.apache.flink.fs.s3hadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
at
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:306)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.fail(ChannelStateCheckpointWriter.java:225)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$abort$4(ChannelStateWriteRequest.java:89)
at
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:176)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:73)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:52)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:94)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:74)
at java.lang.Thread.run(Thread.java:750)
2022-03-16 03:03:22,641 WARN
org.apache.hadoop.fs.s3a.S3AInstrumentation                   - Closing
output stream statistics while data is still marked as pending upload in
OutputStreamStatistics{blocksSubmitted=1, blocksInQueue=1, blocksActive=0,
blockUploadsCompleted=0, blockUploadsFailed=0, bytesPendingUpload=1107015,
bytesUploaded=0, blocksAllocated=1, blocksReleased=1,
blocksActivelyAllocated=0, exceptionsInMultipartFinalize=0,
transferDuration=0 ms, queueDuration=0 ms, averageQueueTime=0 ms,
totalUploadDuration=0 ms, effectiveBandwidth=0.0 bytes/s}
2022-03-16 03:03:22,641 WARN
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory  -
Cannot delete closed and discarded state stream for
s3://aeg-prod-bigdatadl-meta/flink/checkpoint/ams/5c658fc5f325f40baf063a78a20b1bb2/chk-535996/138d89f7-1514-42eb-b4bd-5e89d87c5a02
.
java.io.InterruptedIOException: getFileStatus on
s3://aeg-prod-bigdatadl-meta/flink/checkpoint/ams/5c658fc5f325f40baf063a78a20b1bb2/chk-535996/138d89f7-1514-42eb-b4bd-5e89d87c5a02:
com.amazonaws.AbortedException:
at
org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2187)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:107)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:311)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.fail(ChannelStateCheckpointWriter.java:225)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$abort$4(ChannelStateWriteRequest.java:89)
at
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:176)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:73)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:52)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:94)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:74)
at java.lang.Thread.run(Thread.java:750)
Caused by: com.amazonaws.AbortedException:
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:862)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:740)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1335)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$4(S3AFileSystem.java:1235)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:280)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1232)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2169)
... 14 more
Caused by: com.amazonaws.http.timers.client.SdkInterruptedException
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:917)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:903)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1097)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
... 26 more

 *Thanks,Vijay*