You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "suheng.cloud (Jira)" <ji...@apache.org> on 2022/05/28 07:22:00 UTC

[jira] [Created] (FLINK-27823) Standalone Job continously restart by illegal checkpointId check on PartitionTimeCommitTrigger when use FilesystemTableSink

suheng.cloud created FLINK-27823:
------------------------------------

             Summary: Standalone Job continously restart by illegal checkpointId check on PartitionTimeCommitTrigger when use  FilesystemTableSink
                 Key: FLINK-27823
                 URL: https://issues.apache.org/jira/browse/FLINK-27823
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.13.6
            Reporter: suheng.cloud


Hi, community

When I build up a standalone job to read from kafka topic and sink to hdfs, I found the job continously restart after normal running 4 hours.
When the first restart show up, the logs are like

```
2022-05-28 00:24:04,861 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 26 (type=CHECKPOINT) @ 1653668644856 for job 00000000000000000000000000000000.
2022-05-28 00:34:04,861 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 26 of job 00000000000000000000000000000000 expired before completing.
2022-05-28 00:34:04,866 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 27 (type=CHECKPOINT) @ 1653669244862 for job 00000000000000000000000000000000.
2022-05-28 00:41:02,208 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 27 for job 00000000000000000000000000000000 (117373 bytes in 417284 ms).
2022-05-28 00:41:18,517 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PartitionCommitter -> Sink: end (1/1) (7e16853a4d16a80f96a3e26e17f9d677) switched from RUNNING to FAILED on 192.168.1.142:6122-0b54e0 @ 192.168.1.142 (dataPort=43131).
java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The watermark information is:

{27=1653668944610}

.
at org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[hamal-driver-1.13.6-v1.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[hamal-driver-1.13.6-v1.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
2022-05-28 00:41:18,524 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 00:41:18,525 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 50 tasks should be restarted to recover the failed task cc0206f9bd17ee99dc4565713cd749d7_0. 
2022-05-28 00:41:18,525 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx (00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
2022-05-28 00:41:18,526 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator (24/24) (8d5ae185e722482d8b1ff4bc3ba60e86) switched from RUNNING to CANCELING.
2022-05-28 00:41:18,526 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator (23/24) (369af96456d991046eb10cfee44df415) switched from RUNNING to CANCELING.
...
...
```

after that, the job restart and successfully restore state form cp(using state.checkpoint-storage=jobmanager), and the following checkpoint (27/28/29/...) can also be sucessfully finished. But it seems the recovered state try to report commit msg of old checkpoint 26 to the PartitionCommitter which continously cause failures.
Finally the job restart again and again, and the same error log likes

```
2022-05-28 08:36:23,718 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PartitionCommitter -> Sink: end (1/1) (669dfe28f49ec9b08cb1f605b7e1af86) switched from RUNNING to FAILED on 192.168.1.226:6122-ac5e98 @ 192.168.1.226 (dataPort=41827).
java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The watermark information is:

{27=1653668944610, 28=1653669762385, 29=1653669973437, 30=1653670045517, 31=1653670584329, 32=1653671198834, 33=1653671316604, 34=1653671595057, 35=1653671632382, 36=1653671940262, 37=1653672247793, 38=1653672513421, 39=1653672626251, 40=1653672872425, 41=1653673029517, 42=1653673662173, 43=1653673843265, 44=1653674382981, 45=1653674739299, 46=1653674890522, 47=1653675402372, 48=1653675767340, 49=1653676205712, 50=1653676376692, 51=1653676762574, 52=1653677105303, 53=1653677254604, 54=1653677458683, 55=1653677651603, 57=1653678458691, 58=1653678931845, 59=1653679306742, 60=1653679845020, 61=1653680406114, 62=1653680981416, 63=1653681545056, 64=1653681584696, 65=1653681622029, 66=1653682017861, 67=1653682319529, 68=1653682404672, 69=1653682559904, 70=1653682804993, 71=1653682907991, 72=1653683279780, 73=1653683905573, 74=1653684156034, 75=1653684659397, 76=1653684975030, 77=1653685329183, 78=1653685862724, 79=1653686499090, 80=1653686636903, 81=1653686780782, 82=1653687053096, 83=1653687541953, 84=1653688012617, 85=1653688337464, 86=1653688832762, 87=1653689195316, 88=1653689330027, 89=1653689545859, 90=1653689957313, 91=1653690069643, 92=1653690689424, 93=1653690963316, 94=1653691164532, 95=1653691687307, 96=1653691885408, 97=1653692235231, 98=1653692428716, 99=1653692849146, 100=1653693274253, 101=1653693438601, 102=1653694097925, 103=1653694716179, 104=1653694770858, 105=1653695305421, 106=1653695464923, 107=1653695959050, 108=1653696465917, 109=1653696825723, 110=1653696841452, 111=1653697238699, 112=1653697882510}

.
at org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143) ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[hamal-driver-1.13.6-v1.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[hamal-driver-1.13.6-v1.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
2022-05-28 08:36:23,718 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 08:36:23,719 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 50 tasks should be restarted to recover the failed task cc0206f9bd17ee99dc4565713cd749d7_0. 
2022-05-28 08:36:23,719 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx (00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
2022-05-28 08:36:23,719 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator (24/24) (66177c2069b9aeef21376d7a780ceadb) switched from RUNNING to CANCELING.
2022-05-28 08:36:23,719 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator (23/24) (7028f34c1756cd1fff5cf25dd12fd550) switched from RUNNING to CANCELING.
```

The job logic is very simple, which flink sql like

```

CREATE TEMPORAY TABLE filesystem_sink_table(....)

PARTITIONED BY(`dt`,`hour`,`topic`) WITH(

'connector'='filesystem',

'format'='textfile',

'sink.partition-commit.trigger'='partition-time',

'sink.partition-commit.delay'='1 hour',

'sink.partition-commit.policy.kind'='success-file',

'auto-compaction' = 'true'

...

);

CREATE TEMPORARY TABLE kafka_source_table ...

streamTableEnv.executeSql("INSERT INTO filesystem_sink_table SELECT ... FROM kafka_source_table");

```

I have seek the source at PartitionTimeCommitTrigger, and what puzzle me is that it seems the watermarks should only remove the committed checkpointId after pass the valiation

```

...

if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(
String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}

long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();

...

```

So, do I mistake some config or there some inconsistent state?

Thanks for any help.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)