You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stefan Richter (JIRA)" <ji...@apache.org> on 2018/05/07 07:51:00 UTC

[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException

    [ https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465575#comment-16465575 ] 

Stefan Richter commented on FLINK-9302:
---------------------------------------

If you take a look at this stack trace, it tells you that the reason of the problem is related to the amazon sdk having troubles with the connection to S3 and therefore it could not complete writing the checkpoint.

{{Caused by: com.amazonaws.SdkClientException: Unable to complete multi-part upload. Individual part upload failed : Unable to execute HTTP request: Broken pipe (Write failed)}}

This does not look like a Flink problem, maybe a hickup with S3. There are also proposed solutions to this exception if you google it.

> Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9302
>                 URL: https://issues.apache.org/jira/browse/FLINK-9302
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.4.2
>            Reporter: Narayanan Arunachalam
>            Priority: Major
>
> *state backend: filesystem*
> *checkpoint.mode:EXACTLY_ONCE*
> +dag:+
> val streams = sEnv
> .addSource(makeKafkaSource(config))
> .map(makeEvent)
> .keyBy(_.get(EVENT_GROUP_ID))
> .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
> .trigger(PurgingTrigger.of(EventTimeTrigger.create()))
> .apply(makeEventsList)
> .addSink(makeNoOpSink)
>  * The job runs fine and checkpoints succeed for few hours. 
>  * Later it fails because of the following checkpoint error.
>  * Once the job is recovered from the last successful checkpoint, it continues to fail with the same checkpoint error.
>  * This persists until the job is restarted with no checkpoint state or using the checkpoint previous to the last good one.
> AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).}
> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
> ... 5 more
> Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
> at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
> ... 5 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
> at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
> ... 7 more
> Caused by: java.io.IOException: Could not flush and close the file system output stream to s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 in order to obtain the stream state handle
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:385)
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:397)
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
> at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
> ... 5 more
> Caused by: java.io.IOException: com.amazonaws.SdkClientException: Unable to complete multi-part upload. Individual part upload failed : Unable to execute HTTP request: Broken pipe (Write failed)
> at com.facebook.presto.s3fs.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1267)
> at com.facebook.presto.s3fs.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:1231)
> at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:368)
> ... 11 more
> Caused by: com.amazonaws.SdkClientException: Unable to complete multi-part upload. Individual part upload failed : Unable to execute HTTP request: Broken pipe (Write failed)
> at com.amazonaws.services.s3.transfer.internal.CompleteMultipartUpload.collectPartETags(CompleteMultipartUpload.java:127)
> at com.amazonaws.services.s3.transfer.internal.CompleteMultipartUpload.call(CompleteMultipartUpload.java:89)
> at com.amazonaws.services.s3.transfer.internal.CompleteMultipartUpload.call(CompleteMultipartUpload.java:40)
> ... 4 more
> Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Broken pipe (Write failed)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4330)
> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4277)
> at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3307)
> at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3292)
> at com.amazonaws.services.s3.transfer.internal.UploadPartCallable.call(UploadPartCallable.java:33)
> at com.amazonaws.services.s3.transfer.internal.UploadPartCallable.call(UploadPartCallable.java:23)
> ... 4 more
> Caused by: java.net.SocketException: Broken pipe (Write failed)
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
> at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
> at sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:886)
> at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:857)
> at sun.security.ssl.AppOutputStream.write(AppOutputStream.java:123)
> at org.apache.http.impl.io.SessionOutputBufferImpl.streamWrite(SessionOutputBufferImpl.java:124)
> at org.apache.http.impl.io.SessionOutputBufferImpl.flushBuffer(SessionOutputBufferImpl.java:136)
> at org.apache.http.impl.io.SessionOutputBufferImpl.write(SessionOutputBufferImpl.java:167)
> at org.apache.http.impl.io.ContentLengthOutputStream.write(ContentLengthOutputStream.java:113)
> at org.apache.http.entity.InputStreamEntity.writeTo(InputStreamEntity.java:144)
> at com.amazonaws.http.RepeatableInputStreamRequestEntity.writeTo(RepeatableInputStreamRequestEntity.java:160)
> at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156)
> at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:160)
> at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238)
> at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doSendRequest(SdkHttpRequestExecutor.java:63)
> at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
> at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
> at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
> at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
> at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
> at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
> at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
> ... 16 more
> [CIRCULAR REFERENCE:java.io.IOException: Could not flush and close the file system output stream to s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 in order to obtain the stream state handle]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)