You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Gayan Weerakutti <ga...@linuxdeveloper.space> on 2021/09/27 04:08:11 UTC

Error while processing checkpoint acknowledgement message

Hi,

I have a Apache Beam application deployed on Amazon KDA (Managed Flink 
Cluster). The application basically reads from Kinesis, window data into 
a fixed duration of size ~30s, then publish data back to PubSub.

        pipeline
                 .apply("Read from Kinesis",  new KinesisIORead())
                 .apply("Windowing", 
Window.into(FixedWindows.of(Duration.standardSeconds(30))))
                 .apply(WithKeys.of(DUMMY_KEY))
                 .apply(GroupIntoBatches.ofSize(5))
                 .apply(Values.create())
                 .apply("Map values to single object", ParDo.of(new 
GroupedMessage()))
                 .apply("Write to Pub/Sub", new PubSubWrite()));

I'm using:

beam-sdks-java-core:2.31.0, beam-runners-flink-1.11:2.31.0, 
beam-sdks-java-io-kafka:2.31.0


Checkpointing configs are as follows:

CheckpointingEnabled: true,
CheckpointInterval: 60000,
MinPauseBetweenCheckpoints: 5000

FasterCopy is also enabled.

16 task slots are allocated for the application.


The pipeline usually runs fine for about 15-20 mins, then start making 
intermittent checkpoint failures.

The exception is:

Error while processing checkpoint acknowledgement message

org.apache.flink.util.SerializedThrowable: 
s3://3db4bd0e0169500d35dc925c1fa9414b79d097b8/a15fb789b6dab24701c19f42a61b1cf7-939927294066-1628593971563/checkpoints/a15fa789b6dab24701c19f41a61b1cf7/chk-20/_metadata 
already exists
     at 
org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:758) ~[?:?]
     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
     at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:141) 
~[?:?]
     at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37) 
~[?:?]
     at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
     at 
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
     at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
     at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:306) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
     at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
     ... 8 more
Wrapped by: org.apache.flink.runtime.checkpoint.CheckpointException: 
Could not finalize the pending checkpoint 20. Failure reason: Failure to 
finalize checkpoint.
     at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
     at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
     at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:819) 
~[flink-dist_2.12-1.11.1.jar:1.11.1]
     at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
[?:?]
     at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) 
[?:?]
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
     at java.lang.Thread.run(Thread.java:829) [?:?]

But I can't seem to find any Flink metric that correlate with these 
intermittent checkpoint failures.


I really appreciate any insights on troubleshooting this.


-- 
Thanks & regards,

Gayan Weerakutti

linkedin.com/in/gayanweerakutti 
<https://www.linkedin.com/in/gayanweerakutti/>