You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Gayan Weerakutti <ga...@linuxdeveloper.space> on 2021/09/27 04:08:11 UTC
Error while processing checkpoint acknowledgement message
Hi,
I have a Apache Beam application deployed on Amazon KDA (Managed Flink
Cluster). The application basically reads from Kinesis, window data into
a fixed duration of size ~30s, then publish data back to PubSub.
pipeline
.apply("Read from Kinesis", new KinesisIORead())
.apply("Windowing",
Window.into(FixedWindows.of(Duration.standardSeconds(30))))
.apply(WithKeys.of(DUMMY_KEY))
.apply(GroupIntoBatches.ofSize(5))
.apply(Values.create())
.apply("Map values to single object", ParDo.of(new
GroupedMessage()))
.apply("Write to Pub/Sub", new PubSubWrite()));
I'm using:
beam-sdks-java-core:2.31.0, beam-runners-flink-1.11:2.31.0,
beam-sdks-java-io-kafka:2.31.0
Checkpointing configs are as follows:
CheckpointingEnabled: true,
CheckpointInterval: 60000,
MinPauseBetweenCheckpoints: 5000
FasterCopy is also enabled.
16 task slots are allocated for the application.
The pipeline usually runs fine for about 15-20 mins, then start making
intermittent checkpoint failures.
The exception is:
Error while processing checkpoint acknowledgement message
org.apache.flink.util.SerializedThrowable:
s3://3db4bd0e0169500d35dc925c1fa9414b79d097b8/a15fb789b6dab24701c19f42a61b1cf7-939927294066-1628593971563/checkpoints/a15fa789b6dab24701c19f41a61b1cf7/chk-20/_metadata
already exists
at
org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:758) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
~[?:?]
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
~[?:?]
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:306)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
... 8 more
Wrapped by: org.apache.flink.runtime.checkpoint.CheckpointException:
Could not finalize the pending checkpoint 20. Failure reason: Failure to
finalize checkpoint.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:819)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
But I can't seem to find any Flink metric that correlate with these
intermittent checkpoint failures.
I really appreciate any insights on troubleshooting this.
--
Thanks & regards,
Gayan Weerakutti
linkedin.com/in/gayanweerakutti
<https://www.linkedin.com/in/gayanweerakutti/>