You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Gyula Fora (Jira)" <ji...@apache.org> on 2022/12/01 22:23:00 UTC

[jira] [Updated] (FLINK-30268) HA metadata and other cluster submission related errors should not throw DeploymentFailedException

     [ https://issues.apache.org/jira/browse/FLINK-30268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gyula Fora updated FLINK-30268:
-------------------------------
    Issue Type: Bug  (was: Improvement)

> HA metadata and other cluster submission related errors should not throw DeploymentFailedException
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30268
>                 URL: https://issues.apache.org/jira/browse/FLINK-30268
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>            Reporter: Gyula Fora
>            Assignee: Peter Vary
>            Priority: Blocker
>             Fix For: kubernetes-operator-1.3.0
>
>
> Currently most critical cluster submission errors , and checks that validate HA metadata before deployment, end up throwing DeploymentFailedException.
> This causes the operator to go into a weird state and actually hide the error in subsequent loops:
> {noformat}
> flink-kubernetes-operator 2022-12-01 21:55:03,978 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info    | UPGRADING       | The resource is being upgraded 
> flink-kubernetes-operator 2022-12-01 21:55:03,992 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Info    | SUBMIT          | Starting deployment
> flink-kubernetes-operator 2022-12-01 21:55:03,992 o.a.f.k.o.s.AbstractFlinkService [INFO ][default/basic-checkpoint-ha-example] Deploying application cluster requiring last-state from HA metadata
> flink-kubernetes-operator 2022-12-01 21:55:03,997 o.a.f.k.o.c.FlinkDeploymentController [ERROR][default/basic-checkpoint-ha-example] Flink Deployment failed
> flink-kubernetes-operator org.apache.flink.kubernetes.operator.exception.DeploymentFailedException: HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.validateHaMetadataExists(AbstractFlinkService.java:844)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:177)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:195)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
> flink-kubernetes-operator     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> flink-kubernetes-operator     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> flink-kubernetes-operator     at java.base/java.lang.Thread.run(Unknown Source)
> flink-kubernetes-operator 2022-12-01 21:55:04,034 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Warning | RESTOREFAILED   | HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.
> flink-kubernetes-operator 2022-12-01 21:55:04,034 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/basic-checkpoint-ha-example] End of reconciliation
> flink-kubernetes-operator 2022-12-01 21:55:04,054 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error   | UPGRADING       | {"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.","additionalMetadata":{"reason":"RestoreFailed"},"throwableList":[]} 
> flink-kubernetes-operator 2022-12-01 21:55:19,056 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/basic-checkpoint-ha-example] Starting reconciliation
> flink-kubernetes-operator 2022-12-01 21:55:19,058 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][default/basic-checkpoint-ha-example] UPGRADE change(s) detected (FlinkDeploymentSpec[job.state=RUNNING] differs from FlinkDeploymentSpec[job.state=SUSPENDED]), starting reconciliation.
> flink-kubernetes-operator 2022-12-01 21:55:19,092 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info    | UPGRADING       | The resource is being upgraded 
> flink-kubernetes-operator 2022-12-01 21:55:19,119 o.a.f.k.o.r.d.ApplicationReconciler [ERROR][default/basic-checkpoint-ha-example] Invalid status for deployment: FlinkDeploymentStatus(super=CommonStatus(jobStatus=JobStatus(jobName=CarTopSpeedWindowingExample, jobId=8d5c59b7e960984cd845b9977754d2ef, state=RECONCILING, startTime=1669931677233, updateTime=1669931696153, savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=null, triggerTimestamp=null, triggerType=null, formatType=null, savepointHistory=[], lastPeriodicSavepointTimestamp=0)), error=null), clusterInfo={flink-version=1.15.2, flink-revision=69e8126 @ 2022-08-17T14:58:06+02:00}, jobManagerDeploymentStatus=ERROR, reconciliationStatus=FlinkDeploymentReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1669931719059, lastReconciledSpec={"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/TopSpeedWindowing.jar","parallelism":2,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":0,"initialSavepointPath":null,"upgradeMode":"last-state","allowNonRestoredState":null},"restartNonce":2,"flinkConfiguration":{"high-availability":"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory","high-availability.storageDir":"file:///flink-data/ha","state.checkpoints.dir":"file:///flink-data/checkpoints","state.savepoints.dir":"file:///flink-data/savepoints","taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/flink-data","name":"flink-volume"}]}],"volumes":[{"hostPath":{"path":"/tmp/flink","type":"Directory"},"name":"flink-volume"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":5},"firstDeployment":false}}, lastStableSpec=null, state=UPGRADING)), taskManager=TaskManagerInfo(labelSelector=, replicas=0))
> flink-kubernetes-operator 2022-12-01 21:55:19,133 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Warning | CLUSTERDEPLOYMENTEXCEPTION | This indicates a bug...
> flink-kubernetes-operator 2022-12-01 21:55:19,136 o.a.f.k.o.r.ReconciliationUtils [WARN ][default/basic-checkpoint-ha-example] Attempt count: 0, last attempt: false
> flink-kubernetes-operator 2022-12-01 21:55:19,163 o.a.f.k.o.l.AuditUtils         [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error   | UPGRADING       | {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.RuntimeException: This indicates a bug...","throwableList":[{"type":"java.lang.RuntimeException","message":"This indicates a bug..."}]} 
> flink-kubernetes-operator 2022-12-01 21:55:19,164 i.j.o.p.e.ReconciliationDispatcher [ERROR][default/basic-checkpoint-ha-example] Error during event processing ExecutionScope{ resource id: ResourceID{name='basic-checkpoint-ha-example', namespace='default'}, version: 350553} failed.
> flink-kubernetes-operator org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.lang.RuntimeException: This indicates a bug...
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
> flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
> flink-kubernetes-operator     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> flink-kubernetes-operator     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> flink-kubernetes-operator     at java.base/java.lang.Thread.run(Unknown Source)
> flink-kubernetes-operator Caused by: java.lang.RuntimeException: This indicates a bug...
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:180)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
> flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
> flink-kubernetes-operator     ... 13 more
> {noformat}
> The main cause here is that DeploymentFailedExceptions were originally created so that the observer could signal a JobManager deployment failure (after it was submitted). Thus the error handler logic in the controller actually updates the jmDeploymentStatus and the job state which causes the problem.
> To avoid this we should introduce a new Exception type or use something more suitable. We should not touch touch the jobmanagerDeploymentStatus or the jobstatus in most of these cases and simply retrigger the reconciliation. This will keep the CR in an error loop triggering warnings etc but that is expected in these critical failure scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)