You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Evgeniy Lyutikov <eb...@avito.ru> on 2022/10/06 09:25:00 UTC

Sometimes checkpoints to s3 fail

Hello all.
I can’t understand the floating problem, sometimes checkpoints stop passing, sometimes they start to complete every other time.
Flink 1.14.4 in kubernetes application mode.


2022-10-06 09:08:04,731 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 18314 (type=CHECKPOINT) @ 1665047284716 for job 00000000000000000000000000000000.
2022-10-06 09:11:29,130 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline checkpoint 18314 by task 048169f0e3c2efd473d3cef9c9d2cd70 of job 00000000000000000000000000000000 at job-name-taskmanager-3-1 @ 10.109.0.168 (dataPort=43795).
org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint failed.
        at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:301) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 18314 for operator Process rec last clicks -> Cast rec last clicks type (30/44)#0.
        at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        ... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Could not flush to file and close the file system output stream to s3p://flink-checkpoints/k8s-checkpoint-job-name/00000000000000000000000000000000/shared/7c09fcf1-49b9-4b72-b756-81cd7778e396 in order to obtain the stream state handle
        at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
        at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
        at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: Could not flush to file and close the file system output stream to s3p://flink-checkpoints/k8s-checkpoint-job-name/00000000000000000000000000000000/shared/7c09fcf1-49b9-4b72-b756-81cd7778e396 in order to obtain the stream state handle
        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:373) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:143) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:101) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?]
        ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: com.amazonaws.services.s3.model.AmazonS3Exception: This multipart completion is already in progress (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: tx000000000000000ced9f8-00633e9bc1-18489a52-default; S3 Extended Request ID: 18489a52-default-default; Proxy: null), S3 Extended Request ID: 18489a52-default-default
        at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1278) ~[?:?]
        at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:1226) ~[?:?]
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[?:?]
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) ~[?:?]
        at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) ~[?:?]
        at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:354) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:143) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:101) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?]
        ... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: This multipart completion is already in progress (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: tx000000000000000ced9f8-00633e9bc1-18489a52-default; S3 Extended Request ID: 18489a52-default-default; Proxy: null)



________________________________
“This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом отправителя электронным письмом.”

Re: Sometimes checkpoints to s3 fail

Posted by Matthias Pohl via user <us...@flink.apache.org>.
Hi Evgeniy,
is it Ceph which you're using as a S3 server? All the Google search entries
point to Ceph when looking for the error message. Could it be that there's
a problem with the version of the underlying system? The stacktrace you
provided looks like Flink struggles to close the File and, therefore, fails
to create the checkpoint.

Best,
Matthias

On Thu, Oct 6, 2022 at 11:25 AM Evgeniy Lyutikov <eb...@avito.ru>
wrote:

> Hello all.
> I can’t understand the floating problem, sometimes checkpoints stop
> passing, sometimes they start to complete every other time.
> Flink 1.14.4 in kubernetes application mode.
>
>
> 2022-10-06 09:08:04,731 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Triggering checkpoint 18314 (type=CHECKPOINT) @ 1665047284716 for job
> 00000000000000000000000000000000.
> 2022-10-06 09:11:29,130 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline
> checkpoint 18314 by task 048169f0e3c2efd473d3cef9c9d2cd70 of job
> 00000000000000000000000000000000 at job-name-taskmanager-3-1 @ 10.109.0.168
> (dataPort=43795).
> org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint
> failed.
>         at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:301)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
>         at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: Could not
> materialize checkpoint 18314 for operator Process rec last clicks -> Cast
> rec last clicks type (30/44)#0.
>         at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException:
> Could not flush to file and close the file system output stream to
> s3p://flink-checkpoints/k8s-checkpoint-job-name/00000000000000000000000000000000/shared/7c09fcf1-49b9-4b72-b756-81cd7778e396
> in order to obtain the stream state handle
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> ~[?:?]
>         at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
>         at
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: Could not flush to
> file and close the file system output stream to
> s3p://flink-checkpoints/k8s-checkpoint-job-name/00000000000000000000000000000000/shared/7c09fcf1-49b9-4b72-b756-81cd7778e396
> in order to obtain the stream state handle
>         at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:373)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:143)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:101)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> ~[?:?]
>         ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable:
> com.amazonaws.services.s3.model.AmazonS3Exception: This multipart
> completion is already in progress (Service: Amazon S3; Status Code: 500;
> Error Code: InternalError; Request ID:
> tx000000000000000ced9f8-00633e9bc1-18489a52-default; S3 Extended Request
> ID: 18489a52-default-default; Proxy: null), S3 Extended Request ID:
> 18489a52-default-default
>         at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1278)
> ~[?:?]
>         at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:1226)
> ~[?:?]
>         at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> ~[?:?]
>         at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
> ~[?:?]
>         at
> org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> ~[?:?]
>         at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:354)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:143)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:101)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
>         at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> ~[?:?]
>         ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: This multipart
> completion is already in progress (Service: Amazon S3; Status Code: 500;
> Error Code: InternalError; Request ID:
> tx000000000000000ced9f8-00633e9bc1-18489a52-default; S3 Extended Request
> ID: 18489a52-default-default; Proxy: null)
>
>
>
> * ------------------------------ *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>