You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/02/27 07:24:54 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

gyfora opened a new pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26


   This ticket solves the problem of the lingering configmaps after deleting HA clusters by trying to gracefully shutdown jobs before deleting everything.
   
   It makes the deletion heavier and prone to timeoutexceptions , but this could be improved if we merge https://github.com/apache/flink-kubernetes-operator/pull/21 first then I can improve this based on that.
   
   Also async operations would make this nicer but that is a separate effort tracked in a different ticket.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817600903



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    deploymentStatus.setJobManagerDeploymentStatus(

Review comment:
       The flinkApp.getStatus().getJobStatus() check previously was for already running jobs (that had a status) to avoid unnecessary delay.
   
   For new deployments status was always null and then in the first round it would be cached, second it would be ready. The behaviour now is the same but much more explicit. We also dont need the jobstatus checking anymore as the deployment status is recorded in the resource so old deployments are picked up autmatically




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815572683



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+    READY,

Review comment:
       Document what these mean?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
tweise commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1057300673


   After manually deleting the old CRD (helm uninstall did not do it) the above issue goes away. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815659148



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;

Review comment:
       I will try to make sure this case is accounted for with a test case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817483741



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            FlinkDeploymentSpec lastReconciledSpec =
+                    deploymentStatus.getReconciliationStatus().getLastReconciledSpec();
+
+            // If the job was successfully suspended we set to MISSING status, otherwise keep READY.
+            if (lastReconciledSpec != null
+                    && lastReconciledSpec.getJob() != null
+                    && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED) {
+
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+            }
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    deploymentStatus.setJobManagerDeploymentStatus(
+                            JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                    return;
+                } else {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(

Review comment:
       Does this need to log the status when JobManager deployment port isn't ready?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora removed a comment on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora removed a comment on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053660192


   cc @wangyang0918 @morhidi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817609226



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    deploymentStatus.setJobManagerDeploymentStatus(

Review comment:
       What will happen if all the replicas are running, but the rest port is not reachable? We will have the `DEPLOYING` deployment status, not the `DEPLOYED_NOT_READY`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817672374



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,81 +47,22 @@ public BaseReconciler(KubernetesClient kubernetesClient, FlinkService flinkServi
         this.flinkService = flinkService;
     }
 
-    public boolean removeDeployment(FlinkDeployment flinkApp) {
-        return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
-    }
-
     public abstract UpdateControl<FlinkDeployment> reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
             Context context,
             Configuration effectiveConfig)
             throws Exception;
 
-    protected JobManagerDeploymentStatus checkJobManagerDeployment(
-            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
-        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
-            Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
-            if (deployment.isPresent()) {
-                DeploymentStatus status = deployment.get().getStatus();
-                DeploymentSpec spec = deployment.get().getSpec();
-                if (status != null
-                        && status.getAvailableReplicas() != null
-                        && spec.getReplicas().intValue() == status.getReplicas()
-                        && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
-                    // typically it takes a few seconds for the REST server to be ready
-                    if (flinkService.isJobManagerPortReady(effectiveConfig)) {
-                        LOG.info(
-                                "JobManager deployment {} in namespace {} is ready",
-                                flinkApp.getMetadata().getName(),
-                                flinkApp.getMetadata().getNamespace());
-                        jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return JobManagerDeploymentStatus.READY;
-                        }
-                    }
-                    LOG.info(
-                            "JobManager deployment {} in namespace {} port not ready",
-                            flinkApp.getMetadata().getName(),
-                            flinkApp.getMetadata().getNamespace());
-                    return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
-                }
-                LOG.info(
-                        "JobManager deployment {} in namespace {} not yet ready, status {}",
-                        flinkApp.getMetadata().getName(),
-                        flinkApp.getMetadata().getNamespace(),
-                        status);
-
-                return JobManagerDeploymentStatus.DEPLOYING;
-            }
-            return JobManagerDeploymentStatus.MISSING;
-        }
-        return JobManagerDeploymentStatus.READY;
-    }
-
-    /**
-     * Shuts down the job and deletes all kubernetes resources including k8s HA resources. It will
-     * first perform a graceful shutdown (cancel) before deleting the data if that is unsuccessful
-     * in order to avoid leaking HA metadata to durable storage.
-     *
-     * <p>This feature is limited at the moment to cleaning up native kubernetes HA resources, other
-     * HA providers like ZK need to be cleaned up manually after deletion.
-     */
     public DeleteControl shutdownAndDelete(
-            String operatorNamespace,
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig) {
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
 
-        if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
-                == JobManagerDeploymentStatus.READY) {
+        if (JobManagerDeploymentStatus.READY
+                == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
             shutdown(flinkApp, effectiveConfig);
         } else {
             FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
         }

Review comment:
       Should we also call `FlinkUtils.waitForClusterShutdown()` here? I find `FlinkUtils.waitForClusterShutdown()` is only used for session cluster. And In `waitForClusterShutdown`, I notice that it will wait until JM and other service to be cleared. Why `deleteCluster` is not enough? If it cannot fullfill our requirements, should we wait for other services to be cleared in the `else` branch here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817523349



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
##########
@@ -44,12 +44,12 @@
     public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
         switch (this) {
             case DEPLOYING:
+            case READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
             case DEPLOYED_NOT_READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
-            case READY:
             case MISSING:

Review comment:
       I would prefer to keep it just for transparency, technically it's not needed but I think it makes it cleaerer




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053901452


   Thank you @tweise and @wangyang0918 , I will address the comments in the original commit.
   
   I now pushed an optional refactor commit that restores the observe/reconcile flow that we had before @tweise 's recent change while making it clearer and including the jobmanager deployment validation. 
   
   I would like to hear your opinion about this. I think this makes the reconciler simpler and the flow cleaner but this is an optional change if you feel otherwise.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817601765



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(

Review comment:
       Yes this is true, we probably need a FAILED state to handle some of these cases and some further changes in other places. Better leave this to a follow up I agree




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815588220



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -74,11 +72,14 @@ public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkServic
                     Optional.ofNullable(jobSpec.getInitialSavepointPath()));
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
+            return UpdateControl.updateStatus(flinkApp)

Review comment:
       makes it clearer, I agree




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r818248021



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    deploymentStatus.setJobManagerDeploymentStatus(

Review comment:
       You are right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815571835



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,11 +65,8 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
             Configuration effectiveConfig)
             throws Exception;
 
-    protected UpdateControl<FlinkDeployment> checkJobManagerDeployment(
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig,
-            FlinkService flinkService) {
+    protected JobDeploymentStatus checkJobManagerDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
         if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {

Review comment:
       Let's keep that out of this PR. However, I don't think we would want to repeat port ready check on every reconciliation, even if deployment is cached. I will think a bit more about it when implementing the terminal error state for https://issues.apache.org/jira/browse/FLINK-26261




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815573624



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;

Review comment:
       The above check was required to delay job check to not run into timeout while skipping this for pre-existing deployment. How is that covered now? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817534799



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,81 +47,22 @@ public BaseReconciler(KubernetesClient kubernetesClient, FlinkService flinkServi
         this.flinkService = flinkService;
     }
 
-    public boolean removeDeployment(FlinkDeployment flinkApp) {
-        return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
-    }
-
     public abstract UpdateControl<FlinkDeployment> reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
             Context context,
             Configuration effectiveConfig)
             throws Exception;
 
-    protected JobManagerDeploymentStatus checkJobManagerDeployment(
-            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
-        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
-            Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
-            if (deployment.isPresent()) {
-                DeploymentStatus status = deployment.get().getStatus();
-                DeploymentSpec spec = deployment.get().getSpec();
-                if (status != null
-                        && status.getAvailableReplicas() != null
-                        && spec.getReplicas().intValue() == status.getReplicas()
-                        && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
-                    // typically it takes a few seconds for the REST server to be ready
-                    if (flinkService.isJobManagerPortReady(effectiveConfig)) {
-                        LOG.info(
-                                "JobManager deployment {} in namespace {} is ready",
-                                flinkApp.getMetadata().getName(),
-                                flinkApp.getMetadata().getNamespace());
-                        jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return JobManagerDeploymentStatus.READY;
-                        }
-                    }
-                    LOG.info(
-                            "JobManager deployment {} in namespace {} port not ready",
-                            flinkApp.getMetadata().getName(),
-                            flinkApp.getMetadata().getNamespace());
-                    return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
-                }
-                LOG.info(
-                        "JobManager deployment {} in namespace {} not yet ready, status {}",
-                        flinkApp.getMetadata().getName(),
-                        flinkApp.getMetadata().getNamespace(),
-                        status);
-
-                return JobManagerDeploymentStatus.DEPLOYING;
-            }
-            return JobManagerDeploymentStatus.MISSING;
-        }
-        return JobManagerDeploymentStatus.READY;
-    }
-
-    /**
-     * Shuts down the job and deletes all kubernetes resources including k8s HA resources. It will
-     * first perform a graceful shutdown (cancel) before deleting the data if that is unsuccessful
-     * in order to avoid leaking HA metadata to durable storage.
-     *
-     * <p>This feature is limited at the moment to cleaning up native kubernetes HA resources, other
-     * HA providers like ZK need to be cleaned up manually after deletion.
-     */
     public DeleteControl shutdownAndDelete(
-            String operatorNamespace,
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig) {
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
 
-        if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
-                == JobManagerDeploymentStatus.READY) {
+        if (JobManagerDeploymentStatus.READY
+                == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
             shutdown(flinkApp, effectiveConfig);

Review comment:
       No, the shutdown operation cleans up the resources (we should document this more in the code), so only need to delete the ingress afterwards




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise closed pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
tweise closed pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1054011692


   > Maybe we could get this PR continued and create a dedicated ticket for refactoring/rethinking the controller flow since we need some time to collect more feedbacks from the ML.
   
   Yea, I will keep this PR open and rename it to dedicate to the refactor discussion. And I will move the first commit to a new PR (once I addressed all the comments) which is related to the original ticket


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817364192



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -27,27 +27,82 @@
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 /** Observes the actual state of the running jobs on the Flink cluster. */
-public class JobStatusObserver {
+public class Observer {
 
-    private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
 
     private final FlinkService flinkService;
 
-    public JobStatusObserver(FlinkService flinkService) {
+    public Observer(FlinkService flinkService) {
         this.flinkService = flinkService;
     }
 
-    public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(FlinkDeployment flinkApp, Context context) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+                // typically it takes a few seconds for the REST server to be ready
+                LOG.info(

Review comment:
       @gyfora where is the port check happening?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1056499408


   @wangyang0918 @morhidi @Aitozi @bgeng777 
   If you guys have time to review this today, I would  be happy to merge it at the end of the day


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053656843


   @tweise I rebased the changes to the PR and in order to be able to reuse the functionality you added to the reconciler I had to make some minor changes to the deployment checking.
   
   The logic for reconciliation stays the same but the deployment status checking is more explicit and reusable in other parts now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053660192


   cc @wangyang0918 @morhidi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815602370



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -179,4 +180,22 @@ private void printCancelLogs(UpgradeMode upgradeMode, String name) {
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
         return savepointOpt;
     }
+
+    @Override
+    protected void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        if (flinkApp.getStatus().getJobStatus() != null
+                && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+            try {
+                flinkService.cancelJob(
+                        JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
+                        UpgradeMode.STATELESS,
+                        effectiveConfig);
+                return;
+            } catch (Exception e) {
+                LOG.error("Could not shut down cluster gracefully, deleting...", e);
+            }
+        }
+
+        FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);

Review comment:
       My bad. I miss the return in the `try {...}`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053913686


   @gyfora I'm having a hard time following what's going on here, similar to how hard it was to change to the original reconciliation logic. Can we take a step back and discuss how we want this to work before any further code mods? I think after that we will also need to add more test coverage to assert the expected behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
tweise commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1057268719


   @gyfora with the latest from this PR, I don't see the deployment go into `READY` state. It just loops with `JobManager deployment basic-example in namespace default port ready, waiting for the REST API...`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815600043



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;

Review comment:
       There is a period of typically a few seconds between port ready and rest server ready and if you hit that then the list jobs will run into a timeout. And hence it is also necessary to make a distinction for existing deployments and operator restart, where that isn't relevant. I think I will start covering these scenarios with test cases since we cannot afford to continue moving things around w/o regression checks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1056579534


   I am getting to this PR now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817734199



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
##########
@@ -26,6 +28,8 @@
 @NoArgsConstructor
 @AllArgsConstructor
 public class FlinkDeploymentStatus {
-    private JobStatus jobStatus;
+    private JobStatus jobStatus = new JobStatus();

Review comment:
       Having a default a jobstatus avoids a bunch of null checks, and also not that you mention it I simplified the status update in a commit: https://github.com/apache/flink-kubernetes-operator/pull/26/commits/d4924c0849976930d591a6f60ab77759a4582e5b




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817734199



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
##########
@@ -26,6 +28,8 @@
 @NoArgsConstructor
 @AllArgsConstructor
 public class FlinkDeploymentStatus {
-    private JobStatus jobStatus;
+    private JobStatus jobStatus = new JobStatus();

Review comment:
       Having a default a jobstatus avoids a bunch of null checks, and also now that you mention it I simplified the status update in a commit: https://github.com/apache/flink-kubernetes-operator/pull/26/commits/d4924c0849976930d591a6f60ab77759a4582e5b




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817602495



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -174,15 +159,17 @@ private void printCancelLogs(UpgradeMode upgradeMode, String name) {
                         effectiveConfig);
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
         jobStatus.setState("suspended");
-        removeDeployment(flinkApp);
+        flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
         return savepointOpt;
     }
 
     @Override
     protected void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig) {
-        if (flinkApp.getStatus().getJobStatus() != null
-                && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+        if (org.apache.flink.api.common.JobStatus.RUNNING

Review comment:
       I will add ignoreCase just to be safe and this needs to be hardened




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817698381



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,81 +47,22 @@ public BaseReconciler(KubernetesClient kubernetesClient, FlinkService flinkServi
         this.flinkService = flinkService;
     }
 
-    public boolean removeDeployment(FlinkDeployment flinkApp) {
-        return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
-    }
-
     public abstract UpdateControl<FlinkDeployment> reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
             Context context,
             Configuration effectiveConfig)
             throws Exception;
 
-    protected JobManagerDeploymentStatus checkJobManagerDeployment(
-            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
-        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
-            Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
-            if (deployment.isPresent()) {
-                DeploymentStatus status = deployment.get().getStatus();
-                DeploymentSpec spec = deployment.get().getSpec();
-                if (status != null
-                        && status.getAvailableReplicas() != null
-                        && spec.getReplicas().intValue() == status.getReplicas()
-                        && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
-                    // typically it takes a few seconds for the REST server to be ready
-                    if (flinkService.isJobManagerPortReady(effectiveConfig)) {
-                        LOG.info(
-                                "JobManager deployment {} in namespace {} is ready",
-                                flinkApp.getMetadata().getName(),
-                                flinkApp.getMetadata().getNamespace());
-                        jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return JobManagerDeploymentStatus.READY;
-                        }
-                    }
-                    LOG.info(
-                            "JobManager deployment {} in namespace {} port not ready",
-                            flinkApp.getMetadata().getName(),
-                            flinkApp.getMetadata().getNamespace());
-                    return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
-                }
-                LOG.info(
-                        "JobManager deployment {} in namespace {} not yet ready, status {}",
-                        flinkApp.getMetadata().getName(),
-                        flinkApp.getMetadata().getNamespace(),
-                        status);
-
-                return JobManagerDeploymentStatus.DEPLOYING;
-            }
-            return JobManagerDeploymentStatus.MISSING;
-        }
-        return JobManagerDeploymentStatus.READY;
-    }
-
-    /**
-     * Shuts down the job and deletes all kubernetes resources including k8s HA resources. It will
-     * first perform a graceful shutdown (cancel) before deleting the data if that is unsuccessful
-     * in order to avoid leaking HA metadata to durable storage.
-     *
-     * <p>This feature is limited at the moment to cleaning up native kubernetes HA resources, other
-     * HA providers like ZK need to be cleaned up manually after deletion.
-     */
     public DeleteControl shutdownAndDelete(
-            String operatorNamespace,
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig) {
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
 
-        if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
-                == JobManagerDeploymentStatus.READY) {
+        if (JobManagerDeploymentStatus.READY
+                == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
             shutdown(flinkApp, effectiveConfig);
         } else {
             FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
         }

Review comment:
       Got it. I miss the use in the `waitForClusterShutdown` and it should work fine as we set the `deleteHaConfigmaps` to be true here. It seems that we can refine `stopSessionCluster` to avoid unnecessary `waitForClusterShutdown` when `deleteHaConfigmaps` is true. But it needn't be included in this PR. This PR LGTM. Thanks a lot for the work! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817477134



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
##########
@@ -44,12 +44,12 @@
     public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
         switch (this) {
             case DEPLOYING:
+            case READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
             case DEPLOYED_NOT_READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
-            case READY:
             case MISSING:

Review comment:
       Could this case branch be removed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817845987



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()
+                    && flinkService.isJobManagerPortReady(effectiveConfig)) {
+
+                // typically it takes a few seconds for the REST server to be ready
+                LOG.info(
+                        "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                        flinkApp.getMetadata().getName(),
+                        flinkApp.getMetadata().getNamespace());
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                return;
+            }
+            LOG.info(
+                    "JobManager deployment {} in namespace {} exists but not ready yet, status {}",
+                    flinkApp.getMetadata().getName(),
+                    flinkApp.getMetadata().getNamespace(),
+                    status);
+
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+            return;
+        }
+
+        deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+    }
+
+    private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+
+        // No need to observe job status for session clusters
+        if (flinkApp.getSpec().getJob() == null) {
+            return true;
+        }
+
+        LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
+        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+
+        Collection<JobStatusMessage> clusterJobStatuses;
+        try {
+            clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+        } catch (Exception e) {
+            LOG.error("Exception while listing jobs", e);
+            flinkAppStatus.getJobStatus().setState("UNKNOWN");

Review comment:
       @gyfora, thanks for your answer to my confusion. I got your point here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815570175



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;
                     }
                     LOG.info(
                             "JobManager deployment {} in namespace {} port not ready",
                             flinkApp.getMetadata().getName(),
                             flinkApp.getMetadata().getNamespace());
-                    return UpdateControl.updateStatus(flinkApp)
-                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+                    return JobDeploymentStatus.DEPLOYED_NOT_READY;
                 }
                 LOG.info(
                         "JobManager deployment {} in namespace {} not yet ready, status {}",
                         flinkApp.getMetadata().getName(),
                         flinkApp.getMetadata().getNamespace(),
                         status);
-                // TODO: how frequently do we want here
-                return UpdateControl.updateStatus(flinkApp)
-                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+
+                return JobDeploymentStatus.DEPLOYING;
             }
         }
-        return null;
+        return JobDeploymentStatus.MISSING;

Review comment:
       This is indeed confusing. It previously meant that the deployment is ready and subsequent checks can proceed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817423845



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -27,27 +27,82 @@
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 /** Observes the actual state of the running jobs on the Flink cluster. */
-public class JobStatusObserver {
+public class Observer {
 
-    private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
 
     private final FlinkService flinkService;
 
-    public JobStatusObserver(FlinkService flinkService) {
+    public Observer(FlinkService flinkService) {
         this.flinkService = flinkService;
     }
 
-    public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(FlinkDeployment flinkApp, Context context) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+                // typically it takes a few seconds for the REST server to be ready
+                LOG.info(

Review comment:
       good catch. 
   
   I fixed this error and added some tests for all the state transitions in the observe, covering port checking etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817499383



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
##########
@@ -44,12 +44,12 @@
     public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
         switch (this) {
             case DEPLOYING:
+            case READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
             case DEPLOYED_NOT_READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
-            case READY:
             case MISSING:

Review comment:
       Which branch are you suggesting to be removed @SteNicholas ? the "MISSING"? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817499383



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
##########
@@ -44,12 +44,12 @@
     public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
         switch (this) {
             case DEPLOYING:
+            case READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
             case DEPLOYED_NOT_READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
-            case READY:
             case MISSING:

Review comment:
       Which branch are you suggesting to be removed? the "MISSING"? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815570605



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;
                     }
                     LOG.info(
                             "JobManager deployment {} in namespace {} port not ready",
                             flinkApp.getMetadata().getName(),
                             flinkApp.getMetadata().getNamespace());
-                    return UpdateControl.updateStatus(flinkApp)
-                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+                    return JobDeploymentStatus.DEPLOYED_NOT_READY;
                 }
                 LOG.info(
                         "JobManager deployment {} in namespace {} not yet ready, status {}",
                         flinkApp.getMetadata().getName(),
                         flinkApp.getMetadata().getNamespace(),
                         status);
-                // TODO: how frequently do we want here
-                return UpdateControl.updateStatus(flinkApp)
-                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+
+                return JobDeploymentStatus.DEPLOYING;
             }
         }
-        return null;
+        return JobDeploymentStatus.MISSING;

Review comment:
       According to that it should be `return JobDeploymentStatus.READY`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815583564



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;
                     }
                     LOG.info(
                             "JobManager deployment {} in namespace {} port not ready",
                             flinkApp.getMetadata().getName(),
                             flinkApp.getMetadata().getNamespace());
-                    return UpdateControl.updateStatus(flinkApp)
-                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+                    return JobDeploymentStatus.DEPLOYED_NOT_READY;
                 }
                 LOG.info(
                         "JobManager deployment {} in namespace {} not yet ready, status {}",
                         flinkApp.getMetadata().getName(),
                         flinkApp.getMetadata().getNamespace(),
                         status);
-                // TODO: how frequently do we want here
-                return UpdateControl.updateStatus(flinkApp)
-                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+
+                return JobDeploymentStatus.DEPLOYING;
             }
         }
-        return null;
+        return JobDeploymentStatus.MISSING;

Review comment:
       this is a mistake indeed, fixing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815596037



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;

Review comment:
       JobDeploymentStatus.READY means that the deployment and ports are ready. In that case there should not be any timeout. Did I overlook something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora edited a comment on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora edited a comment on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053918360


   I agree this would be a good time to discuss and agree on a few design principles. 
   
   In my view the Validate -> Observe -> Reconcile flow is very natural and easy to understand.  We want to keep each of these components simple and self contained as much as possible. 
   
   The commits that introduced jobmanager deployment validation (which is a very important, core feature) removed the original observer -> reconcile logic and moved part of the observing to the reconciler. To me this made the whole reconcile very complex and hard to understand with null checks and logic duplication.
   
   I think it leads to a simpler design to have a separation of concerns where the Observer component can describe the status of the deployment and the job, and the reconciler can act on it.
   
   If there is disagreement here we should probably move this to the mailing list for further discussion and visibility.
   
   @tweise @wangyang0918 @morhidi  @Aitozi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817676014



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,81 +47,22 @@ public BaseReconciler(KubernetesClient kubernetesClient, FlinkService flinkServi
         this.flinkService = flinkService;
     }
 
-    public boolean removeDeployment(FlinkDeployment flinkApp) {
-        return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
-    }
-
     public abstract UpdateControl<FlinkDeployment> reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
             Context context,
             Configuration effectiveConfig)
             throws Exception;
 
-    protected JobManagerDeploymentStatus checkJobManagerDeployment(
-            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
-        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
-            Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
-            if (deployment.isPresent()) {
-                DeploymentStatus status = deployment.get().getStatus();
-                DeploymentSpec spec = deployment.get().getSpec();
-                if (status != null
-                        && status.getAvailableReplicas() != null
-                        && spec.getReplicas().intValue() == status.getReplicas()
-                        && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
-                    // typically it takes a few seconds for the REST server to be ready
-                    if (flinkService.isJobManagerPortReady(effectiveConfig)) {
-                        LOG.info(
-                                "JobManager deployment {} in namespace {} is ready",
-                                flinkApp.getMetadata().getName(),
-                                flinkApp.getMetadata().getNamespace());
-                        jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return JobManagerDeploymentStatus.READY;
-                        }
-                    }
-                    LOG.info(
-                            "JobManager deployment {} in namespace {} port not ready",
-                            flinkApp.getMetadata().getName(),
-                            flinkApp.getMetadata().getNamespace());
-                    return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
-                }
-                LOG.info(
-                        "JobManager deployment {} in namespace {} not yet ready, status {}",
-                        flinkApp.getMetadata().getName(),
-                        flinkApp.getMetadata().getNamespace(),
-                        status);
-
-                return JobManagerDeploymentStatus.DEPLOYING;
-            }
-            return JobManagerDeploymentStatus.MISSING;
-        }
-        return JobManagerDeploymentStatus.READY;
-    }
-
-    /**
-     * Shuts down the job and deletes all kubernetes resources including k8s HA resources. It will
-     * first perform a graceful shutdown (cancel) before deleting the data if that is unsuccessful
-     * in order to avoid leaking HA metadata to durable storage.
-     *
-     * <p>This feature is limited at the moment to cleaning up native kubernetes HA resources, other
-     * HA providers like ZK need to be cleaned up manually after deletion.
-     */
     public DeleteControl shutdownAndDelete(
-            String operatorNamespace,
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig) {
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
 
-        if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
-                == JobManagerDeploymentStatus.READY) {
+        if (JobManagerDeploymentStatus.READY
+                == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
             shutdown(flinkApp, effectiveConfig);
         } else {
             FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
         }

Review comment:
       FlinkUtils.deleteCluster will call waitForClusterShutdown internally if deleteHaConfigmaps is set to true. This has been introduced recently to clean up properly on deletion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817596086



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -174,15 +159,17 @@ private void printCancelLogs(UpgradeMode upgradeMode, String name) {
                         effectiveConfig);
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
         jobStatus.setState("suspended");
-        removeDeployment(flinkApp);
+        flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
         return savepointOpt;
     }
 
     @Override
     protected void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig) {
-        if (flinkApp.getStatus().getJobStatus() != null
-                && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+        if (org.apache.flink.api.common.JobStatus.RUNNING

Review comment:
       Maybe we do not need to ignore case. And this could be improved in FLINK-26178.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817652188



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    deploymentStatus.setJobManagerDeploymentStatus(

Review comment:
       This is now highlighted in the observer test: https://github.com/apache/flink-kubernetes-operator/pull/26/files#diff-ac210d80370718839b3bdfc124a9dfc2ed32b06e35ca71474efd5a9f27203595R92-R116
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053290386


   cc @tweise @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815582399



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,11 +65,8 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
             Configuration effectiveConfig)
             throws Exception;
 
-    protected UpdateControl<FlinkDeployment> checkJobManagerDeployment(
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig,
-            FlinkService flinkService) {
+    protected JobDeploymentStatus checkJobManagerDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
         if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {

Review comment:
       Yes. We indeed do not want to repeat port ready check on every reconciliation. It is a heave operation. Looking forward the improvement in FLINK-26261. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815591556



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -179,4 +180,22 @@ private void printCancelLogs(UpgradeMode upgradeMode, String name) {
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
         return savepointOpt;
     }
+
+    @Override
+    protected void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        if (flinkApp.getStatus().getJobStatus() != null
+                && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+            try {
+                flinkService.cancelJob(
+                        JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
+                        UpgradeMode.STATELESS,
+                        effectiveConfig);
+                return;
+            } catch (Exception e) {
+                LOG.error("Could not shut down cluster gracefully, deleting...", e);
+            }
+        }
+
+        FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);

Review comment:
       Do you mean we should delete it always?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817504020



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            FlinkDeploymentSpec lastReconciledSpec =
+                    deploymentStatus.getReconciliationStatus().getLastReconciledSpec();
+
+            // If the job was successfully suspended we set to MISSING status, otherwise keep READY.
+            if (lastReconciledSpec != null
+                    && lastReconciledSpec.getJob() != null
+                    && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED) {
+
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+            }
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    deploymentStatus.setJobManagerDeploymentStatus(
+                            JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                    return;
+                } else {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(

Review comment:
       Maybe it would be better to log this when the port became ready, I agree :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053635981


   Looks good, will review after it is rebased since there is a larger conflict now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053918360


   I agree this would be a good time to discuss and agree on a few design principles. 
   
   In my view the Validate -> Observe -> Reconcile flow is very natural and easy to understand.  We want to keep each of these components simple and self contained as much as possible. 
   
   The commits that introduced jobmanager deployment validation (which is a very important, core feature) removed the original observer -> reconcile logic and moved part of the observing to the reconciler. To me this made the whole reconcile very complex and hard to understand with null checks and logic duplication.
   
   I think it leads to a simpler design to have a separation of concerns where the Observer component can describe the status of the deployment and the job, and the reconciler can act on it.
   
   If there is disagreement here we should probably move this to the mailing list for further discussion and visibility.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817482046



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            FlinkDeploymentSpec lastReconciledSpec =
+                    deploymentStatus.getReconciliationStatus().getLastReconciledSpec();
+
+            // If the job was successfully suspended we set to MISSING status, otherwise keep READY.
+            if (lastReconciledSpec != null
+                    && lastReconciledSpec.getJob() != null
+                    && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED) {
+
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+            }
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    deploymentStatus.setJobManagerDeploymentStatus(
+                            JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                    return;
+                } else {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(

Review comment:
       Does this need to wait for the port ready? IMO, this could wait for the REST server ready and change the `JobManagerDeploymentStatus` to `DEPLOYED_NOT_READY `.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817500860



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
##########
@@ -44,12 +44,12 @@
     public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
         switch (this) {
             case DEPLOYING:
+            case READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
             case DEPLOYED_NOT_READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
-            case READY:
             case MISSING:

Review comment:
       @gyfora Yes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817609226



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    deploymentStatus.setJobManagerDeploymentStatus(

Review comment:
       What will happen if all the replicas are running, but the rest port is not reachable? We will have the `MISSING`, not the `DEPLOYED_NOT_READY`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817822871



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()
+                    && flinkService.isJobManagerPortReady(effectiveConfig)) {
+
+                // typically it takes a few seconds for the REST server to be ready
+                LOG.info(
+                        "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                        flinkApp.getMetadata().getName(),
+                        flinkApp.getMetadata().getNamespace());
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                return;
+            }
+            LOG.info(
+                    "JobManager deployment {} in namespace {} exists but not ready yet, status {}",
+                    flinkApp.getMetadata().getName(),
+                    flinkApp.getMetadata().getNamespace(),
+                    status);
+
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+            return;
+        }
+
+        deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+    }
+
+    private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+
+        // No need to observe job status for session clusters
+        if (flinkApp.getSpec().getJob() == null) {
+            return true;
+        }
+
+        LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
+        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+
+        Collection<JobStatusMessage> clusterJobStatuses;
+        try {
+            clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+        } catch (Exception e) {
+            LOG.error("Exception while listing jobs", e);
+            flinkAppStatus.getJobStatus().setState("UNKNOWN");

Review comment:
       I'm confused that whether to set the state to `UNKNOWN` when listing jobs exception. IMO, the listing jobs exception is same as no jobs found, therefore this doesn't update the job status.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora edited a comment on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora edited a comment on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053901452


   Thank you @tweise and @wangyang0918 , I will address the comments in the original commit.
   
   I now pushed an optional refactor commit that restores the observe/reconcile flow that we had before @tweise 's recent change while making it clearer and including the jobmanager deployment validation. 
   
   I would like to hear your opinion about this. I think this makes the reconciler simpler and the flow cleaner but this is an optional change if you feel otherwise.
   
   I think the key points are:
    - Remove update control null checking in both session and job reconciler
    - Observation of both deployment and job status is centralized in the Observer
    - Clear flow on when reconciliation should happen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815587910



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -86,16 +85,14 @@ public FlinkDeploymentController(
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
-        LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName());
-        FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
-        IngressUtils.updateIngressRules(
-                flinkApp,
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()),
-                operatorNamespace,
-                kubernetesClient,
-                true);
+        LOG.info("Stopping cluster {}", flinkApp.getMetadata().getName());
         getReconciler(flinkApp).removeDeployment(flinkApp);

Review comment:
       good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053988753


   For the sake of transparency I have moved this to a discussion thread on the dev list: https://lists.apache.org/thread/ydvdxfn8go95gnmmh5k9ggwx9hwxn6tl
   
   Let's try to come to an agreement there together before continuing! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817808334



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()
+                    && flinkService.isJobManagerPortReady(effectiveConfig)) {
+
+                // typically it takes a few seconds for the REST server to be ready
+                LOG.info(
+                        "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                        flinkApp.getMetadata().getName(),
+                        flinkApp.getMetadata().getNamespace());
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                return;
+            }
+            LOG.info(
+                    "JobManager deployment {} in namespace {} exists but not ready yet, status {}",
+                    flinkApp.getMetadata().getName(),
+                    flinkApp.getMetadata().getNamespace(),
+                    status);
+
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+            return;
+        }
+
+        deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+    }
+
+    private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+
+        // No need to observe job status for session clusters
+        if (flinkApp.getSpec().getJob() == null) {
+            return true;
+        }
+
+        LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
+        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+
+        Collection<JobStatusMessage> clusterJobStatuses;
+        try {
+            clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+        } catch (Exception e) {
+            LOG.error("Exception while listing jobs", e);
+            flinkAppStatus.getJobStatus().setState("UNKNOWN");
+            return false;
+        }
+        if (clusterJobStatuses.isEmpty()) {
+            LOG.info("No jobs found on {} yet", flinkApp.getMetadata().getName());
+            return false;
+        } else {
+            updateJobStatus(flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses));
+            LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
+            return true;
+        }
+    }
+
+    private boolean isReadyToReconcile(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        JobManagerDeploymentStatus jmDeploymentStatus =
+                flinkApp.getStatus().getJobManagerDeploymentStatus();
+
+        switch (jmDeploymentStatus) {
+            case READY:
+                return observeFlinkJobStatus(flinkApp, effectiveConfig);
+            case MISSING:
+                return true;
+            case DEPLOYING:
+            case DEPLOYED_NOT_READY:
+                return false;
+            default:
+                throw new RuntimeException("Unknown status: " + jmDeploymentStatus);
+        }
+    }
+
+    /** Update previous job status based on the job list from the cluster. */
+    private void updateJobStatus(JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
+        Collections.sort(
+                clusterJobStatuses,
+                (j1, j2) -> -1 * Long.compare(j1.getStartTime(), j2.getStartTime()));

Review comment:
       ```suggestion
                   (j1, j2) -> Long.compare(j2.getStartTime(), j1.getStartTime()));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817596086



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -174,15 +159,17 @@ private void printCancelLogs(UpgradeMode upgradeMode, String name) {
                         effectiveConfig);
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
         jobStatus.setState("suspended");
-        removeDeployment(flinkApp);
+        flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
         return savepointOpt;
     }
 
     @Override
     protected void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig) {
-        if (flinkApp.getStatus().getJobStatus() != null
-                && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+        if (org.apache.flink.api.common.JobStatus.RUNNING

Review comment:
       Maybe we do not need to ignore case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817828345



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()
+                    && flinkService.isJobManagerPortReady(effectiveConfig)) {
+
+                // typically it takes a few seconds for the REST server to be ready
+                LOG.info(
+                        "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                        flinkApp.getMetadata().getName(),
+                        flinkApp.getMetadata().getNamespace());
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                return;
+            }
+            LOG.info(
+                    "JobManager deployment {} in namespace {} exists but not ready yet, status {}",
+                    flinkApp.getMetadata().getName(),
+                    flinkApp.getMetadata().getNamespace(),
+                    status);
+
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+            return;
+        }
+
+        deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+    }
+
+    private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+
+        // No need to observe job status for session clusters
+        if (flinkApp.getSpec().getJob() == null) {
+            return true;
+        }
+
+        LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
+        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+
+        Collection<JobStatusMessage> clusterJobStatuses;
+        try {
+            clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+        } catch (Exception e) {
+            LOG.error("Exception while listing jobs", e);
+            flinkAppStatus.getJobStatus().setState("UNKNOWN");
+            return false;
+        }
+        if (clusterJobStatuses.isEmpty()) {
+            LOG.info("No jobs found on {} yet", flinkApp.getMetadata().getName());
+            return false;
+        } else {
+            updateJobStatus(flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses));
+            LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
+            return true;
+        }
+    }
+
+    private boolean isReadyToReconcile(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        JobManagerDeploymentStatus jmDeploymentStatus =
+                flinkApp.getStatus().getJobManagerDeploymentStatus();
+
+        switch (jmDeploymentStatus) {
+            case READY:
+                return observeFlinkJobStatus(flinkApp, effectiveConfig);
+            case MISSING:
+                return true;
+            case DEPLOYING:
+            case DEPLOYED_NOT_READY:
+                return false;
+            default:
+                throw new RuntimeException("Unknown status: " + jmDeploymentStatus);
+        }
+    }
+
+    /** Update previous job status based on the job list from the cluster. */
+    private void updateJobStatus(JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
+        Collections.sort(
+                clusterJobStatuses,
+                (j1, j2) -> -1 * Long.compare(j1.getStartTime(), j2.getStartTime()));

Review comment:
       @gyfora, this change affects the failure of the check style.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1054008848


   Maybe we could get this PR continued and create a dedicated ticket for refactoring/rethinking the controller flow since we need some time to collect more feedbacks from the ML.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815556799



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;
                     }
                     LOG.info(
                             "JobManager deployment {} in namespace {} port not ready",
                             flinkApp.getMetadata().getName(),
                             flinkApp.getMetadata().getNamespace());
-                    return UpdateControl.updateStatus(flinkApp)
-                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+                    return JobDeploymentStatus.DEPLOYED_NOT_READY;
                 }
                 LOG.info(
                         "JobManager deployment {} in namespace {} not yet ready, status {}",
                         flinkApp.getMetadata().getName(),
                         flinkApp.getMetadata().getNamespace(),
                         status);
-                // TODO: how frequently do we want here
-                return UpdateControl.updateStatus(flinkApp)
-                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+
+                return JobDeploymentStatus.DEPLOYING;
             }
         }
-        return null;
+        return JobDeploymentStatus.MISSING;

Review comment:
       The `MISSING` is confusing here. In my mind, the `MISSING` means jobmanager deployment could not be found and needs also to trigger a reconciliation after `REFRESH_SECONDS`.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -86,16 +85,14 @@ public FlinkDeploymentController(
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
-        LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName());
-        FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
-        IngressUtils.updateIngressRules(
-                flinkApp,
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()),
-                operatorNamespace,
-                kubernetesClient,
-                true);
+        LOG.info("Stopping cluster {}", flinkApp.getMetadata().getName());
         getReconciler(flinkApp).removeDeployment(flinkApp);

Review comment:
       We already remove the deployment in `BaseReconciler#shutdownAndDelete()`. Right?

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -74,11 +72,14 @@ public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkServic
                     Optional.ofNullable(jobSpec.getInitialSavepointPath()));
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
+            return UpdateControl.updateStatus(flinkApp)

Review comment:
       Maybe we need to return `JobDeploymentStatus.DEPLOYING.toUpdateControl(flinkApp)` since we just call the `deployFlinkJob` above. The jobmanager deployment is still deploying.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -179,4 +180,22 @@ private void printCancelLogs(UpgradeMode upgradeMode, String name) {
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
         return savepointOpt;
     }
+
+    @Override
+    protected void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        if (flinkApp.getStatus().getJobStatus() != null
+                && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+            try {
+                flinkService.cancelJob(
+                        JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
+                        UpgradeMode.STATELESS,
+                        effectiveConfig);
+                return;
+            } catch (Exception e) {
+                LOG.error("Could not shut down cluster gracefully, deleting...", e);
+            }
+        }
+
+        FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);

Review comment:
       Do we only need to manually delete the cluster when cancellation failed?

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
##########
@@ -171,14 +171,14 @@ public boolean isJobManagerPortReady(Configuration config) {
         return savepointOpt;
     }
 
-    public void stopSessionCluster(FlinkDeployment deployment, Configuration conf)
-            throws Exception {
-        FlinkUtils.deleteCluster(deployment, kubernetesClient);
+    public void stopSessionCluster(
+            FlinkDeployment deployment, Configuration conf, boolean deleteHaData) {
+        FlinkUtils.deleteCluster(deployment, kubernetesClient, deleteHaData);
         waitForClusterShutdown(conf);
     }
 
     /** We need this due to the buggy flink kube cluster client behaviour for now. */

Review comment:
       Out the scope of this PR.
   
   I am curious what is the bug of flink kube cluster client so that we could fix it in the upstream project.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,11 +65,8 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
             Configuration effectiveConfig)
             throws Exception;
 
-    protected UpdateControl<FlinkDeployment> checkJobManagerDeployment(
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig,
-            FlinkService flinkService) {
+    protected JobDeploymentStatus checkJobManagerDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
         if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {

Review comment:
       Out the scope of this PR.
   
   Could we check the JobManager deployment for every reconciliation instead of using the cached `jobManagerDeployments`? We are already using an informer to cache the deployments locally. So it is not a heavy operation. After then, we could also handle the jobmanager pod crash scenario.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817609226



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    deploymentStatus.setJobManagerDeploymentStatus(

Review comment:
       What will happen if all the replicas are running, but the rest port is not reachable? We will have the `MISSING` deployment status, not the `DEPLOYED_NOT_READY`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817487152



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -54,81 +47,22 @@ public BaseReconciler(KubernetesClient kubernetesClient, FlinkService flinkServi
         this.flinkService = flinkService;
     }
 
-    public boolean removeDeployment(FlinkDeployment flinkApp) {
-        return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
-    }
-
     public abstract UpdateControl<FlinkDeployment> reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
             Context context,
             Configuration effectiveConfig)
             throws Exception;
 
-    protected JobManagerDeploymentStatus checkJobManagerDeployment(
-            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
-        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
-            Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
-            if (deployment.isPresent()) {
-                DeploymentStatus status = deployment.get().getStatus();
-                DeploymentSpec spec = deployment.get().getSpec();
-                if (status != null
-                        && status.getAvailableReplicas() != null
-                        && spec.getReplicas().intValue() == status.getReplicas()
-                        && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
-                    // typically it takes a few seconds for the REST server to be ready
-                    if (flinkService.isJobManagerPortReady(effectiveConfig)) {
-                        LOG.info(
-                                "JobManager deployment {} in namespace {} is ready",
-                                flinkApp.getMetadata().getName(),
-                                flinkApp.getMetadata().getNamespace());
-                        jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return JobManagerDeploymentStatus.READY;
-                        }
-                    }
-                    LOG.info(
-                            "JobManager deployment {} in namespace {} port not ready",
-                            flinkApp.getMetadata().getName(),
-                            flinkApp.getMetadata().getNamespace());
-                    return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
-                }
-                LOG.info(
-                        "JobManager deployment {} in namespace {} not yet ready, status {}",
-                        flinkApp.getMetadata().getName(),
-                        flinkApp.getMetadata().getNamespace(),
-                        status);
-
-                return JobManagerDeploymentStatus.DEPLOYING;
-            }
-            return JobManagerDeploymentStatus.MISSING;
-        }
-        return JobManagerDeploymentStatus.READY;
-    }
-
-    /**
-     * Shuts down the job and deletes all kubernetes resources including k8s HA resources. It will
-     * first perform a graceful shutdown (cancel) before deleting the data if that is unsuccessful
-     * in order to avoid leaking HA metadata to durable storage.
-     *
-     * <p>This feature is limited at the moment to cleaning up native kubernetes HA resources, other
-     * HA providers like ZK need to be cleaned up manually after deletion.
-     */
     public DeleteControl shutdownAndDelete(
-            String operatorNamespace,
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig) {
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
 
-        if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
-                == JobManagerDeploymentStatus.READY) {
+        if (JobManagerDeploymentStatus.READY
+                == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
             shutdown(flinkApp, effectiveConfig);

Review comment:
       Does this need to delete cluster after shutdown?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817650866



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    deploymentStatus.setJobManagerDeploymentStatus(

Review comment:
       Exactly, thats how it works and this is also how it worked before, it stays in DEPLOYING until the replicas and ports are ready when it goes to DEPLOYED_NOT_READY. The name might not be the best but it means that we need to do one more iteration because the rest api probably need a few more seconds (which we cannot explicitly check)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] SteNicholas commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817508014



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            FlinkDeploymentSpec lastReconciledSpec =
+                    deploymentStatus.getReconciliationStatus().getLastReconciledSpec();
+
+            // If the job was successfully suspended we set to MISSING status, otherwise keep READY.
+            if (lastReconciledSpec != null
+                    && lastReconciledSpec.getJob() != null
+                    && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED) {
+
+                deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+            }
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    deploymentStatus.setJobManagerDeploymentStatus(
+                            JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                    return;
+                } else {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(

Review comment:
       @gyfora, this makes sense to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817690157



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
##########
@@ -26,6 +28,8 @@
 @NoArgsConstructor
 @AllArgsConstructor
 public class FlinkDeploymentStatus {
-    private JobStatus jobStatus;
+    private JobStatus jobStatus = new JobStatus();

Review comment:
       Not a crucial one: having default value for `jobStatus` field or not both make sense to me. But I am a little curious if there is any specific reason for adding it here. If there is, maybe we can refactor `createJobStatus()` in Observer.java to a static method here like `parseFromJobMessage()`. Whatever, this one is not critical for this PR and we can leave it as a followup if we hope.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817839391



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()
+                    && flinkService.isJobManagerPortReady(effectiveConfig)) {
+
+                // typically it takes a few seconds for the REST server to be ready
+                LOG.info(
+                        "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                        flinkApp.getMetadata().getName(),
+                        flinkApp.getMetadata().getNamespace());
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                return;
+            }
+            LOG.info(
+                    "JobManager deployment {} in namespace {} exists but not ready yet, status {}",
+                    flinkApp.getMetadata().getName(),
+                    flinkApp.getMetadata().getNamespace(),
+                    status);
+
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+            return;
+        }
+
+        deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+    }
+
+    private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+
+        // No need to observe job status for session clusters
+        if (flinkApp.getSpec().getJob() == null) {
+            return true;
+        }
+
+        LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
+        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+
+        Collection<JobStatusMessage> clusterJobStatuses;
+        try {
+            clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+        } catch (Exception e) {
+            LOG.error("Exception while listing jobs", e);
+            flinkAppStatus.getJobStatus().setState("UNKNOWN");

Review comment:
       I have added this so I can simply call observe when we delete the flink resource and only try to cancel if the job is RUNNING.
   
   UNKNOWN does not say that it is running/failing it barely means that the operator cannot access it so flink client operations are not possible. It might not be perfect but I think it makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817537191



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    deploymentStatus.setJobManagerDeploymentStatus(

Review comment:
       I think we changed the old behavior here. Before this change, only when `flinkApp.getStatus().getJobStatus() != null`, we will set the `READY`. Otherwise, `JobManagerDeploymentStatus` will be set to `DEPLOYED_NOT_READY`.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+
+                if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                    deploymentStatus.setJobManagerDeploymentStatus(
+                            JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                    return;
+                } else {
+                    // typically it takes a few seconds for the REST server to be ready
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port not ready",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                }
+            }
+            LOG.info(
+                    "JobManager deployment {} in namespace {} not yet ready, status {}",
+                    flinkApp.getMetadata().getName(),
+                    flinkApp.getMetadata().getNamespace(),
+                    status);
+
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+            return;
+        }
+
+        deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+        return;

Review comment:
       nit: Unnecessary `return`.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(

Review comment:
       Currently, the `observeJmDeployment` still could not cover some scenarios, e.g. JobManager deployment crashed, JobManager deployment was deleted externally. When it `JobManagerDeploymentStatus` comes to `READY`, it will always be `READY`.
   
   Since this is just the behavior as before, I agree to do the improvement in the follow-up tickets. 

##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
##########
@@ -21,57 +21,107 @@
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
+import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/** @link JobStatusObserver unit tests */
-public class JobStatusObserverTest {
+/** @link Observer unit tests */
+public class ObserverTest {
+
+    private final Context readyContext =
+            JobReconcilerTest.createContextWithReadyJobManagerDeployment();
 
     @Test
     public void observeSessionCluster() throws Exception {

Review comment:
       The test do not throw exception.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -174,15 +159,17 @@ private void printCancelLogs(UpgradeMode upgradeMode, String name) {
                         effectiveConfig);
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
         jobStatus.setState("suspended");
-        removeDeployment(flinkApp);
+        flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
         return savepointOpt;
     }
 
     @Override
     protected void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig) {
-        if (flinkApp.getStatus().getJobStatus() != null
-                && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+        if (org.apache.flink.api.common.JobStatus.RUNNING

Review comment:
       Do we need to ignore case here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815592072



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+    READY,

Review comment:
       makes sense, will add some comments to the values




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26432] Cleanly separate validator, observer and reconciler modules

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #26:
URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r817137800



##########
File path: helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
##########
@@ -9073,6 +9073,13 @@ spec:
                   savepointLocation:
                     type: string
                 type: object
+              jobManagerDeploymentStatus:
+                enum:

Review comment:
       @tweise you will probably need to add a special status for deployment errors. I have not added a state for that as we have no logic in place yet.
   
   Other than that we can probably reuse the reconciliation status error field for the actual error message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org