You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/10 15:05:08 UTC

[GitHub] [flink-kubernetes-operator] Aitozi opened a new pull request, #307: Flink 28187

Aitozi opened a new pull request, #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307

   ## What is the purpose of the change
   
   This PR is meant to use the same mechanism of the `AbstractDeploymentObserver`. It can recognize the lost job information now, based on the fixed jobId generator. 
   
   ## Brief change log
   
     - Use the `uid.hashcode` + `generation` to make up the `JobID`.
     - Detect the jobs in the session cluster with the same uid prefix and generation. 
   
   ## Verifying this change
   
   - new tests to verify the change. 
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (no)
     - Core observer or reconciler logic that is regularly executed: (yes)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #307: [FLINK-28187] Handle SessionJob errors on observe

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307#issuecomment-1179750216

   cc @gyfora 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #307: [FLINK-28187] Handle SessionJob errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307#discussion_r917430020


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -115,4 +131,62 @@ public void observe(FlinkSessionJob flinkSessionJob, Context context) {
         }
         SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
+
+    private void checkIfAlreadyUpgraded(
+            FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+        var uid = flinkSessionJob.getMetadata().getUid();
+        Collection<JobStatusMessage> jobStatusMessages;
+        try {
+            jobStatusMessages = flinkService.listJobs(deployedConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list jobs", e);
+        }
+        var matchedJobs = new ArrayList<JobID>();
+        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
+            var jobId = jobStatusMessage.getJobId();
+            if (jobId.getLowerPart() == uid.hashCode()
+                    && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
+                matchedJobs.add(jobId);
+            }
+        }
+
+        if (matchedJobs.isEmpty()) {
+            return;
+        } else if (matchedJobs.size() > 1) {
+            // this indicates a bug, which means we have more than one running job for a single
+            // SessionJob CR.
+            throw new RuntimeException(
+                    String.format(
+                            "Unexpected case: %d job found for the resource with uid: %s",
+                            matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
+        } else {
+            var matchedJobID = matchedJobs.get(0);
+            Long upgradeTargetGeneration =
+                    ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
+            long deployedGeneration = matchedJobID.getUpperPart();
+
+            if (upgradeTargetGeneration == deployedGeneration) {
+                var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
+                LOG.info(
+                        "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
+                        oldJobID,
+                        matchedJobID.toHexString());
+                ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setJobId(matchedJobs.get(0).toHexString());
+            } else {
+                LOG.warn(
+                        "Running job {}'s generation {} doesn't match upgrade target generation {}.",
+                        matchedJobID.toHexString(),
+                        deployedGeneration,
+                        upgradeTargetGeneration);
+            }

Review Comment:
   I think it might make sense to throw an error at this point. Otherwise we get double submission because the new job will have a new jobid due to the different generation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #307: [FLINK-28187] Handle SessionJob errors on observe

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307#discussion_r917458759


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -115,4 +131,62 @@ public void observe(FlinkSessionJob flinkSessionJob, Context context) {
         }
         SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
+
+    private void checkIfAlreadyUpgraded(
+            FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+        var uid = flinkSessionJob.getMetadata().getUid();
+        Collection<JobStatusMessage> jobStatusMessages;
+        try {
+            jobStatusMessages = flinkService.listJobs(deployedConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list jobs", e);
+        }
+        var matchedJobs = new ArrayList<JobID>();
+        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
+            var jobId = jobStatusMessage.getJobId();
+            if (jobId.getLowerPart() == uid.hashCode()
+                    && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
+                matchedJobs.add(jobId);
+            }
+        }
+
+        if (matchedJobs.isEmpty()) {
+            return;
+        } else if (matchedJobs.size() > 1) {
+            // this indicates a bug, which means we have more than one running job for a single
+            // SessionJob CR.
+            throw new RuntimeException(
+                    String.format(
+                            "Unexpected case: %d job found for the resource with uid: %s",
+                            matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
+        } else {
+            var matchedJobID = matchedJobs.get(0);
+            Long upgradeTargetGeneration =
+                    ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
+            long deployedGeneration = matchedJobID.getUpperPart();
+
+            if (upgradeTargetGeneration == deployedGeneration) {
+                var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
+                LOG.info(
+                        "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
+                        oldJobID,
+                        matchedJobID.toHexString());
+                ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setJobId(matchedJobs.get(0).toHexString());
+            } else {
+                LOG.warn(
+                        "Running job {}'s generation {} doesn't match upgrade target generation {}.",
+                        matchedJobID.toHexString(),
+                        deployedGeneration,
+                        upgradeTargetGeneration);
+            }

Review Comment:
   I think we have a chance to get here:
   
   1. The upgrade fails between the `UPGRADING` status recorded and deploy
   2. The deployedGeneration will smaller than the targetGeneration
   
   In this case, we have to wait for the next turn to deal with reconcileSpecChange. It will cancel the oldJobID in jobStatus and submit with new JobID.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #307: [FLINK-28187] Handle SessionJob errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #307: [FLINK-28187] Handle SessionJob errors on observe

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307#discussion_r917459062


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -115,4 +131,62 @@ public void observe(FlinkSessionJob flinkSessionJob, Context context) {
         }
         SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
+
+    private void checkIfAlreadyUpgraded(
+            FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+        var uid = flinkSessionJob.getMetadata().getUid();
+        Collection<JobStatusMessage> jobStatusMessages;
+        try {
+            jobStatusMessages = flinkService.listJobs(deployedConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list jobs", e);
+        }
+        var matchedJobs = new ArrayList<JobID>();
+        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
+            var jobId = jobStatusMessage.getJobId();
+            if (jobId.getLowerPart() == uid.hashCode()
+                    && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
+                matchedJobs.add(jobId);
+            }
+        }
+
+        if (matchedJobs.isEmpty()) {
+            return;
+        } else if (matchedJobs.size() > 1) {
+            // this indicates a bug, which means we have more than one running job for a single
+            // SessionJob CR.
+            throw new RuntimeException(
+                    String.format(
+                            "Unexpected case: %d job found for the resource with uid: %s",
+                            matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
+        } else {
+            var matchedJobID = matchedJobs.get(0);
+            Long upgradeTargetGeneration =
+                    ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
+            long deployedGeneration = matchedJobID.getUpperPart();
+
+            if (upgradeTargetGeneration == deployedGeneration) {
+                var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
+                LOG.info(
+                        "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
+                        oldJobID,
+                        matchedJobID.toHexString());
+                ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setJobId(matchedJobs.get(0).toHexString());
+            } else {
+                LOG.warn(
+                        "Running job {}'s generation {} doesn't match upgrade target generation {}.",
+                        matchedJobID.toHexString(),
+                        deployedGeneration,
+                        upgradeTargetGeneration);
+            }

Review Comment:
   For safe, I think we could throw an exception here if the matchedJobID do not equal the the id in the JobStatus too. WDTY ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #307: [FLINK-28187] Handle SessionJob errors on observe

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307#discussion_r917459062


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -115,4 +131,62 @@ public void observe(FlinkSessionJob flinkSessionJob, Context context) {
         }
         SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
+
+    private void checkIfAlreadyUpgraded(
+            FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+        var uid = flinkSessionJob.getMetadata().getUid();
+        Collection<JobStatusMessage> jobStatusMessages;
+        try {
+            jobStatusMessages = flinkService.listJobs(deployedConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list jobs", e);
+        }
+        var matchedJobs = new ArrayList<JobID>();
+        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
+            var jobId = jobStatusMessage.getJobId();
+            if (jobId.getLowerPart() == uid.hashCode()
+                    && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
+                matchedJobs.add(jobId);
+            }
+        }
+
+        if (matchedJobs.isEmpty()) {
+            return;
+        } else if (matchedJobs.size() > 1) {
+            // this indicates a bug, which means we have more than one running job for a single
+            // SessionJob CR.
+            throw new RuntimeException(
+                    String.format(
+                            "Unexpected case: %d job found for the resource with uid: %s",
+                            matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
+        } else {
+            var matchedJobID = matchedJobs.get(0);
+            Long upgradeTargetGeneration =
+                    ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
+            long deployedGeneration = matchedJobID.getUpperPart();
+
+            if (upgradeTargetGeneration == deployedGeneration) {
+                var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
+                LOG.info(
+                        "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
+                        oldJobID,
+                        matchedJobID.toHexString());
+                ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setJobId(matchedJobs.get(0).toHexString());
+            } else {
+                LOG.warn(
+                        "Running job {}'s generation {} doesn't match upgrade target generation {}.",
+                        matchedJobID.toHexString(),
+                        deployedGeneration,
+                        upgradeTargetGeneration);
+            }

Review Comment:
   For safe, I think we could throw an exception here if the matchedJobID do not equal to the JobID in the JobStatus, too. WDTY ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #307: [FLINK-28187] Handle SessionJob errors on observe

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307#discussion_r917582951


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -115,4 +131,62 @@ public void observe(FlinkSessionJob flinkSessionJob, Context context) {
         }
         SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
+
+    private void checkIfAlreadyUpgraded(
+            FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+        var uid = flinkSessionJob.getMetadata().getUid();
+        Collection<JobStatusMessage> jobStatusMessages;
+        try {
+            jobStatusMessages = flinkService.listJobs(deployedConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list jobs", e);
+        }
+        var matchedJobs = new ArrayList<JobID>();
+        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
+            var jobId = jobStatusMessage.getJobId();
+            if (jobId.getLowerPart() == uid.hashCode()
+                    && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
+                matchedJobs.add(jobId);
+            }
+        }
+
+        if (matchedJobs.isEmpty()) {
+            return;
+        } else if (matchedJobs.size() > 1) {
+            // this indicates a bug, which means we have more than one running job for a single
+            // SessionJob CR.
+            throw new RuntimeException(
+                    String.format(
+                            "Unexpected case: %d job found for the resource with uid: %s",
+                            matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
+        } else {
+            var matchedJobID = matchedJobs.get(0);
+            Long upgradeTargetGeneration =
+                    ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
+            long deployedGeneration = matchedJobID.getUpperPart();
+
+            if (upgradeTargetGeneration == deployedGeneration) {
+                var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
+                LOG.info(
+                        "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
+                        oldJobID,
+                        matchedJobID.toHexString());
+                ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setJobId(matchedJobs.get(0).toHexString());
+            } else {
+                LOG.warn(
+                        "Running job {}'s generation {} doesn't match upgrade target generation {}.",
+                        matchedJobID.toHexString(),
+                        deployedGeneration,
+                        upgradeTargetGeneration);
+            }

Review Comment:
   You are right, Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #307: [FLINK-28187] Handle SessionJob errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307#discussion_r917540203


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -115,4 +131,62 @@ public void observe(FlinkSessionJob flinkSessionJob, Context context) {
         }
         SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
+
+    private void checkIfAlreadyUpgraded(
+            FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+        var uid = flinkSessionJob.getMetadata().getUid();
+        Collection<JobStatusMessage> jobStatusMessages;
+        try {
+            jobStatusMessages = flinkService.listJobs(deployedConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list jobs", e);
+        }
+        var matchedJobs = new ArrayList<JobID>();
+        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
+            var jobId = jobStatusMessage.getJobId();
+            if (jobId.getLowerPart() == uid.hashCode()
+                    && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
+                matchedJobs.add(jobId);
+            }
+        }
+
+        if (matchedJobs.isEmpty()) {
+            return;
+        } else if (matchedJobs.size() > 1) {
+            // this indicates a bug, which means we have more than one running job for a single
+            // SessionJob CR.
+            throw new RuntimeException(
+                    String.format(
+                            "Unexpected case: %d job found for the resource with uid: %s",
+                            matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
+        } else {
+            var matchedJobID = matchedJobs.get(0);
+            Long upgradeTargetGeneration =
+                    ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
+            long deployedGeneration = matchedJobID.getUpperPart();
+
+            if (upgradeTargetGeneration == deployedGeneration) {
+                var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
+                LOG.info(
+                        "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
+                        oldJobID,
+                        matchedJobID.toHexString());
+                ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setJobId(matchedJobs.get(0).toHexString());
+            } else {
+                LOG.warn(
+                        "Running job {}'s generation {} doesn't match upgrade target generation {}.",
+                        matchedJobID.toHexString(),
+                        deployedGeneration,
+                        upgradeTargetGeneration);
+            }

Review Comment:
   Sorry I don’t completely understand.
   
   when we are in UPGRADING state the old job is already suspended. We should never have upgrading state + still the old job running 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org