You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2022/04/11 06:44:00 UTC

[jira] [Comment Edited] (FLINK-27148) UnalignedCheckpointITCase fails on AZP

    [ https://issues.apache.org/jira/browse/FLINK-27148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17520331#comment-17520331 ] 

Yun Gao edited comment on FLINK-27148 at 4/11/22 6:43 AM:
----------------------------------------------------------

With some checks I think this issue is caused by the modification in https://issues.apache.org/jira/browse/FLINK-26394, [~pltbkd] could you have a look? 

The scenarios of this issue is
# The timer thread starts triggering a checkpoint
# in createPendingCheckpoint, after putting the checkpoint into the pendingCheckpoints, the timer thread given up the CPU.
# The JM main thread start stopping the CheckpointCoordinator, which further aborts all the pending checkpoints. 

The story was changed from here. Previously it would
# The JM main thread abort the pending checkpoint.
# The timer thread resume. it does not check whether the pending checkpoint is aborted, thus it continues to triggering the operator coordinators. It would mark the valve currentCheckpointId = 11.
# After that it starts triggering the tasks. However, a check here found that the pending checkpoint is aborted, thus it called onTriggerFailure, which further reset the valve currentCheckpointId to None.


After the change
# The JM main thread abort the pending checkpoint. However, with the newly introduced masterTriggerCompletePromise to be canceled, it would insert a new onTriggerFailure task to the timer thread.
# The timer thread executes onTriggerFailure and reset the valve currentCheckpointId to None.
# The timer thread resume the triggering as before, it continues to triggering the operator coordinators. It would mark the valve currentCheckpointId = 11.
# However, since now masterTriggerCompletePromise has been completed, the triggering stop after the source coordinator acknowledged. There won't be onTriggerFailure get executed after that, thus the valve currentCheckpointId = 11 is kept.

As a whole, currently the trigger is split into multiple steps. Between any two steps the pending checkpoints might be aborted. We need to check this state carefully in the start of any step to avoid inconsistency. 

Since this PR is only merged onto master, I think the release-1.15 is still ok. I'll first remove 1.15 from the affected and fixed versions. 


was (Author: gaoyunhaii):
With some checks I think this issue is caused by the modification in https://issues.apache.org/jira/browse/FLINK-26394, [~pltbkd] could you have a look? 

The scenarios of this issue is
1. The timer thread starts triggering a checkpoint
2. in createPendingCheckpoint, after putting the checkpoint into the pendingCheckpoints, the timer thread given up the CPU.
3. The JM main thread start stopping the CheckpointCoordinator, which further aborts all the pending checkpoints. 

The story was changed from here. Previously it would
1. The JM main thread abort the pending checkpoint.
2. The timer thread resume. it does not check whether the pending checkpoint is aborted, thus it continues to triggering the operator coordinators. It would mark the valve currentCheckpointId = 11.
3. After that it starts triggering the tasks. However, a check here found that the pending checkpoint is aborted, thus it called onTriggerFailure, which further reset the valve currentCheckpointId to None.


After the change
1. The JM main thread abort the pending checkpoint. However, with the newly introduced masterTriggerCompletePromise to be canceled, it would insert a new onTriggerFailure task to the timer thread.
2. The timer thread executes onTriggerFailure and reset the valve currentCheckpointId to None.
3. The timer thread resume the triggering as before, it continues to triggering the operator coordinators. It would mark the valve currentCheckpointId = 11.
4. However, since now masterTriggerCompletePromise has been completed, the triggering stop after the source coordinator acknowledged. There won't be onTriggerFailure get executed after that, thus the valve currentCheckpointId = 11 is kept.

As a whole, currently the trigger is split into multiple steps. Between any two steps the pending checkpoints might be aborted. We need to check this state carefully in the start of any step to avoid inconsistency. 

Since this PR is only merged onto master, I think the release-1.15 is still ok. I'll first remove 1.15 from the affected and fixed versions. 

> UnalignedCheckpointITCase fails on AZP
> --------------------------------------
>
>                 Key: FLINK-27148
>                 URL: https://issues.apache.org/jira/browse/FLINK-27148
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Network
>    Affects Versions: 1.16.0
>            Reporter: Roman Khachatryan
>            Priority: Blocker
>             Fix For: 1.16.0
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba&l=5812]
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34394&view=logs&j=baf26b34-3c6a-54e8-f93f-cf269b32f802&t=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9&l=6018]
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34448&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7&l=41655]
> Relevant error message:
>  
> {code:java}
> Caused by: java.lang.IllegalStateException: Cannot mark for checkpoint 12, already marked for checkpoint 11
> 	at org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113)
>  {code}
>  
> {code:java}
> [ERROR] Tests run: 22, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 174.732 s <<< FAILURE! - in org.apache.flink.test.checkpointing.UnalignedCheckpointITCase
> [ERROR] UnalignedCheckpointITCase.execute  Time elapsed: 6.408 s  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> 	at org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:184)
> 	at org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:287)
> 	at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.rules.Verifier$1.evaluate(Verifier.java:35)
> 	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 	at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 	at org.junit.runners.Suite.runChild(Suite.java:128)
> 	at org.junit.runners.Suite.runChild(Suite.java:27)
> 	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> 	at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
> 	at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
> 	at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
> 	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
> 	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
> 	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
> 	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
> 	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
> 	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
> 	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
> 	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
> 	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
> 	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
> 	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
> 	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
> 	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
> 	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
> 	at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
> 	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
> Caused by: java.lang.IllegalStateException: Cannot mark for checkpoint 12, already marked for checkpoint 11
> 	at org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113)
> 	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:302)
> 	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:230)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443)
> 	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> 	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> 	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> 	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> 	at akka.actor.Actor.aroundReceive(Actor.scala:537)
> 	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> 	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> 	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> 	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> 	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> 	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)