You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hao Sun <ha...@zendesk.com> on 2017/12/13 04:28:48 UTC

Could not flush and close the file system output stream to s3a, is this fixed?

https://issues.apache.org/jira/browse/FLINK-7590

I have a similar situation with Flink 1.3.2 on K8S

=========
2017-12-13 00:57:12,403 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
influxdbSink(maxwell.tickets) (1/3) (6ad009755a6009975d197e75afa05e14)
switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception:
Could not materialize checkpoint 803 for operator Source:
KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
influxdbSink(maxwell.tickets) (1/3).} at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
Could not materialize checkpoint 803 for operator Source:
KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
java.util.concurrent.ExecutionException: java.io.IOException: Could not
flush and close the file system output stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle at
java.util.concurrent.FutureTask.report(FutureTask.java:122) at
java.util.concurrent.FutureTask.get(FutureTask.java:192) at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906)
... 5 more Suppressed: java.lang.Exception: Could not properly cancel
managed operator state future. at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
... 5 more Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not flush and close the file system output
stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle at
java.util.concurrent.FutureTask.report(FutureTask.java:122) at
java.util.concurrent.FutureTask.get(FutureTask.java:192) at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96)
... 7 more Caused by: java.io.IOException: Could not flush and close the
file system output stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
at
org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:270)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906)
... 5 more Caused by: com.amazonaws.services.s3.model.AmazonS3Exception:
Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A,
AWS Error Code: RequestTimeout, AWS Error Message: Your socket connection
to the server was not read from or written to within the timeout period.
Idle connections will be closed., S3 Extended Request ID:
dADBPVGflB29xtFb7ydxD2SU3LzHw2cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc=
at
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
at
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and
close the file system output stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle] 2017-12-13 00:57:12,404 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job KafkaDemo
maxwell.tickets (env:production) (d5a8b2ab61625cf0aa1e66360b7ad0af)
switched from state RUNNING to FAILING.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
803 for operator Source: KafkaSource(maxwell.tickets) ->
MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
influxdbSink(maxwell.tickets) (1/3).} at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
Could not materialize checkpoint 803 for operator Source:
KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets) ->
FixedDelayWatermark(maxwell.tickets) ->
MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
java.util.concurrent.ExecutionException: java.io.IOException: Could not
flush and close the file system output stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle at
java.util.concurrent.FutureTask.report(FutureTask.java:122) at
java.util.concurrent.FutureTask.get(FutureTask.java:192) at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906)
... 5 more Suppressed: java.lang.Exception: Could not properly cancel
managed operator state future. at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
... 5 more Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not flush and close the file system output
stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle at
java.util.concurrent.FutureTask.report(FutureTask.java:122) at
java.util.concurrent.FutureTask.get(FutureTask.java:192) at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96)
... 7 more Caused by: java.io.IOException: Could not flush and close the
file system output stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
at
org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:270)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906)
... 5 more Caused by: com.amazonaws.services.s3.model.AmazonS3Exception:
Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A,
AWS Error Code: RequestTimeout, AWS Error Message: Your socket connection
to the server was not read from or written to within the timeout period.
Idle connections will be closed., S3 Extended Request ID:
dADBPVGflB29xtFb7ydxD2SU3LzHw2cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc=
at
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
at
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and
close the file system output stream to
s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
in order to obtain the stream state handle]

Re: Could not flush and close the file system output stream to s3a, is this fixed?

Posted by Stephan Ewen <se...@apache.org>.
@Hao Can you provide a better formatted stack trace? Very hard to read it
like it is...

On Thu, Dec 14, 2017 at 11:05 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Bowen Li (in CC) closed the issue but there is no fix (or at least it is
> not linked in the JIRA).
> Maybe it was resolved in another issue or can be differently resolved.
>
> @Bowen, can you comment on how to fix this problem? Will it work in Flink
> 1.4.0?
>
> Thank you,
> Fabian
>
> 2017-12-13 5:28 GMT+01:00 Hao Sun <ha...@zendesk.com>:
>
>> https://issues.apache.org/jira/browse/FLINK-7590
>>
>> I have a similar situation with Flink 1.3.2 on K8S
>>
>> =========
>> 2017-12-13 00:57:12,403 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
>> - Source: KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3) (6ad009755a6009975d197e75afa05e14)
>> switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:970) at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
>> java.lang.Exception: Could not properly cancel managed operator state
>> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:98) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.cleanup(StreamTask.java:1023) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by:
>> java.io.IOException: Could not flush and close the file system output
>> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at org.apache.flink.runtime.state
>> .filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutpu
>> tStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
>> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOC
>> allable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:270) at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:233) at
>> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca
>> ll(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
>> Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code:
>> RequestTimeout, AWS Error Message: Your socket connection to the server was
>> not read from or written to within the timeout period. Idle connections
>> will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2
>> cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc= at
>> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
>> at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>> at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
>> at com.amazonaws.services.s3.transfer.internal.UploadCallable.u
>> ploadInOneChunk(UploadCallable.java:108) at
>> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>> ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and
>> close the file system output stream to s3a://zendesk-euc1-fraud-preve
>> ntion-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0a
>> f/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
>> stream state handle] 2017-12-13 00:57:12,404 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job KafkaDemo
>> maxwell.tickets (env:production) (d5a8b2ab61625cf0aa1e66360b7ad0af)
>> switched from state RUNNING to FAILING. AsynchronousException{java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:970) at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
>> java.lang.Exception: Could not properly cancel managed operator state
>> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:98) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.cleanup(StreamTask.java:1023) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by:
>> java.io.IOException: Could not flush and close the file system output
>> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at org.apache.flink.runtime.state
>> .filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutpu
>> tStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
>> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOC
>> allable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:270) at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:233) at
>> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca
>> ll(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
>> Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code:
>> RequestTimeout, AWS Error Message: Your socket connection to the server was
>> not read from or written to within the timeout period. Idle connections
>> will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2
>> cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc= at
>> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
>> at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>> at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
>> at com.amazonaws.services.s3.transfer.internal.UploadCallable.u
>> ploadInOneChunk(UploadCallable.java:108) at
>> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>> ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and
>> close the file system output stream to s3a://zendesk-euc1-fraud-preve
>> ntion-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0a
>> f/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
>> stream state handle]
>>
>>
>

Re: Could not flush and close the file system output stream to s3a, is this fixed?

Posted by Bowen Li <bo...@offerupnow.com>.
Hi,

The problem reported in FLINK-7590 only happened one time on our end. And,
as you can see from its comments,  we suspected it's caused by AWS-SDK or
Hadoop's s3a implementation, which we have no control over.

Flink 1.4.0 has its own S3 implementations. I haven't tried it yet.


On Thu, Dec 14, 2017 at 2:05 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Bowen Li (in CC) closed the issue but there is no fix (or at least it is
> not linked in the JIRA).
> Maybe it was resolved in another issue or can be differently resolved.
>
> @Bowen, can you comment on how to fix this problem? Will it work in Flink
> 1.4.0?
>
> Thank you,
> Fabian
>
> 2017-12-13 5:28 GMT+01:00 Hao Sun <ha...@zendesk.com>:
>
>> https://issues.apache.org/jira/browse/FLINK-7590
>>
>> I have a similar situation with Flink 1.3.2 on K8S
>>
>> =========
>> 2017-12-13 00:57:12,403 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
>> - Source: KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3) (6ad009755a6009975d197e75afa05e14)
>> switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:970) at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
>> java.lang.Exception: Could not properly cancel managed operator state
>> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:98) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.cleanup(StreamTask.java:1023) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by:
>> java.io.IOException: Could not flush and close the file system output
>> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at org.apache.flink.runtime.state
>> .filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutpu
>> tStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
>> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOC
>> allable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:270) at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:233) at
>> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca
>> ll(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
>> Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code:
>> RequestTimeout, AWS Error Message: Your socket connection to the server was
>> not read from or written to within the timeout period. Idle connections
>> will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2
>> cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc= at
>> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
>> at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>> at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
>> at com.amazonaws.services.s3.transfer.internal.UploadCallable.u
>> ploadInOneChunk(UploadCallable.java:108) at
>> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>> ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and
>> close the file system output stream to s3a://zendesk-euc1-fraud-preve
>> ntion-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0a
>> f/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
>> stream state handle] 2017-12-13 00:57:12,404 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job KafkaDemo
>> maxwell.tickets (env:production) (d5a8b2ab61625cf0aa1e66360b7ad0af)
>> switched from state RUNNING to FAILING. AsynchronousException{java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:970) at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
>> java.lang.Exception: Could not properly cancel managed operator state
>> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:98) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.cleanup(StreamTask.java:1023) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by:
>> java.io.IOException: Could not flush and close the file system output
>> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at org.apache.flink.runtime.state
>> .filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutpu
>> tStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
>> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOC
>> allable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>> at org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:270) at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$
>> 1.performOperation(DefaultOperatorStateBackend.java:233) at
>> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.ca
>> ll(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
>> Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code:
>> RequestTimeout, AWS Error Message: Your socket connection to the server was
>> not read from or written to within the timeout period. Idle connections
>> will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2
>> cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc= at
>> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
>> at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>> at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
>> at com.amazonaws.services.s3.transfer.internal.UploadCallable.u
>> ploadInOneChunk(UploadCallable.java:108) at
>> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>> ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and
>> close the file system output stream to s3a://zendesk-euc1-fraud-preve
>> ntion-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0a
>> f/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
>> stream state handle]
>>
>>
>

Re: Could not flush and close the file system output stream to s3a, is this fixed?

Posted by Fabian Hueske <fh...@gmail.com>.
Bowen Li (in CC) closed the issue but there is no fix (or at least it is
not linked in the JIRA).
Maybe it was resolved in another issue or can be differently resolved.

@Bowen, can you comment on how to fix this problem? Will it work in Flink
1.4.0?

Thank you,
Fabian

2017-12-13 5:28 GMT+01:00 Hao Sun <ha...@zendesk.com>:

> https://issues.apache.org/jira/browse/FLINK-7590
>
> I have a similar situation with Flink 1.3.2 on K8S
>
> =========
> 2017-12-13 00:57:12,403 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
> -> Sink: influxdbSink(maxwell.tickets) (1/3) (
> 6ad009755a6009975d197e75afa05e14) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 803 for operator Source: KafkaSource(maxwell.tickets) ->
> MaxwellFilter->Maxwell(maxwell.tickets) -> FixedDelayWatermark(maxwell.tickets)
> -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
> influxdbSink(maxwell.tickets) (1/3).} at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
> Could not materialize checkpoint 803 for operator Source:
> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
> java.util.concurrent.ExecutionException: java.io.IOException: Could not
> flush and close the file system output stream to s3a://zendesk-euc1-fraud-
> prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0
> af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
> stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
> java.lang.Exception: Could not properly cancel managed operator state
> future. at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98) at
> org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) at
> org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
> java.util.concurrent.ExecutionException: java.io.IOException: Could not
> flush and close the file system output stream to s3a://zendesk-euc1-fraud-
> prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0
> af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
> stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
> at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(
> OperatorSnapshotResult.java:96) ... 7 more Caused by:
> java.io.IOException: Could not flush and close the file system output
> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/
> d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
> in order to obtain the stream state handle at org.apache.flink.runtime.
> state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.
> closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.
> closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.
> performOperation(DefaultOperatorStateBackend.java:270) at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.
> performOperation(DefaultOperatorStateBackend.java:233) at
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.
> call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.
> FutureTask.run(FutureTask.java:266) at org.apache.flink.util.
> FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at
> org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by:
> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
> Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code:
> RequestTimeout, AWS Error Message: Your socket connection to the server was
> not read from or written to within the timeout period. Idle connections
> will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2
> cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc= at com.amazonaws.http.
> AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) at
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
> at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
> at com.amazonaws.services.s3.transfer.internal.UploadCallable.
> uploadInOneChunk(UploadCallable.java:108) at com.amazonaws.services.s3.
> transfer.internal.UploadCallable.call(UploadCallable.java:100) at
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
> ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and
> close the file system output stream to s3a://zendesk-euc1-fraud-
> prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0
> af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
> stream state handle] 2017-12-13 00:57:12,404 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Job KafkaDemo maxwell.tickets (env:production) (
> d5a8b2ab61625cf0aa1e66360b7ad0af) switched from state RUNNING to FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 803 for operator Source: KafkaSource(maxwell.tickets) ->
> MaxwellFilter->Maxwell(maxwell.tickets) -> FixedDelayWatermark(maxwell.tickets)
> -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets) -> Sink:
> influxdbSink(maxwell.tickets) (1/3).} at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
> Could not materialize checkpoint 803 for operator Source:
> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
> -> FixedDelayWatermark(maxwell.tickets) -> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
> java.util.concurrent.ExecutionException: java.io.IOException: Could not
> flush and close the file system output stream to s3a://zendesk-euc1-fraud-
> prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0
> af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
> stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
> java.lang.Exception: Could not properly cancel managed operator state
> future. at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98) at
> org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) at
> org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
> java.util.concurrent.ExecutionException: java.io.IOException: Could not
> flush and close the file system output stream to s3a://zendesk-euc1-fraud-
> prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0
> af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
> stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
> at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(
> OperatorSnapshotResult.java:96) ... 7 more Caused by:
> java.io.IOException: Could not flush and close the file system output
> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/
> d5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
> in order to obtain the stream state handle at org.apache.flink.runtime.
> state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.
> closeAndGetHandle(FsCheckpointStreamFactory.java:336) at
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.
> closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.
> performOperation(DefaultOperatorStateBackend.java:270) at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.
> performOperation(DefaultOperatorStateBackend.java:233) at
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.
> call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.
> FutureTask.run(FutureTask.java:266) at org.apache.flink.util.
> FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at
> org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:906) ... 5 more Caused by:
> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
> Service: Amazon S3, AWS Request ID: 751174B20E6C6A0A, AWS Error Code:
> RequestTimeout, AWS Error Message: Your socket connection to the server was
> not read from or written to within the timeout period. Idle connections
> will be closed., S3 Extended Request ID: dADBPVGflB29xtFb7ydxD2SU3LzHw2
> cBkumOK5EX4TYgt+LVErSOShxPkZmGrCvmT39FHDbIryc= at com.amazonaws.http.
> AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) at
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
> at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
> at com.amazonaws.services.s3.transfer.internal.UploadCallable.
> uploadInOneChunk(UploadCallable.java:108) at com.amazonaws.services.s3.
> transfer.internal.UploadCallable.call(UploadCallable.java:100) at
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
> at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
> ... 4 more [CIRCULAR REFERENCE:java.io.IOException: Could not flush and
> close the file system output stream to s3a://zendesk-euc1-fraud-
> prevention-production/checkpoints/d5a8b2ab61625cf0aa1e66360b7ad0
> af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc in order to obtain the
> stream state handle]
>
>