You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/02/25 12:24:56 UTC

[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #21: [FLINK-26261] Wait for JobManager deployment ready before accessing REST API

wangyang0918 commented on a change in pull request #21:
URL: https://github.com/apache/flink-kubernetes-operator/pull/21#discussion_r814713388



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -145,11 +190,16 @@ private void updateForReconciliationError(FlinkDeployment flinkApp, String err)
     @Override
     public List<EventSource> prepareEventSources(
             EventSourceContext<FlinkDeployment> eventSourceContext) {
-        // TODO: start status updated
-        //        return List.of(new PerResourcePollingEventSource<>(
-        //                new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD,
-        //                FlinkApplication.class));
-        return Collections.emptyList();
+        // reconcile when job manager deployment and REST API are ready
+        SharedIndexInformer<Deployment> deploymentInformer =
+                kubernetesClient
+                        .apps()
+                        .deployments()
+                        .inAnyNamespace()
+                        .withLabel("type", "flink-native-kubernetes")

Review comment:
       Using constants in `org.apache.flink.kubernetes.utils.Constants` could make us aware of upstream changes.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -114,6 +126,39 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         } catch (Exception e) {
             throw new ReconciliationException(e);
         }
+
+        return checkDeployment(flinkApp, context);
+    }
+
+    private UpdateControl<FlinkDeployment> checkDeployment(
+            FlinkDeployment flinkApp, Context context) {
+        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())) {

Review comment:
       Why do we use `selfLink` here? I am afraid the `selfLink` field does not always exist.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -145,11 +190,16 @@ private void updateForReconciliationError(FlinkDeployment flinkApp, String err)
     @Override
     public List<EventSource> prepareEventSources(
             EventSourceContext<FlinkDeployment> eventSourceContext) {
-        // TODO: start status updated
-        //        return List.of(new PerResourcePollingEventSource<>(
-        //                new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD,
-        //                FlinkApplication.class));
-        return Collections.emptyList();
+        // reconcile when job manager deployment and REST API are ready
+        SharedIndexInformer<Deployment> deploymentInformer =
+                kubernetesClient
+                        .apps()
+                        .deployments()
+                        .inAnyNamespace()

Review comment:
       Maybe we do not need to watch all the namespaces if `FLINK_OPERATOR_WATCH_NAMESPACES` configured.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -114,6 +126,39 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         } catch (Exception e) {
             throw new ReconciliationException(e);
         }
+
+        return checkDeployment(flinkApp, context);
+    }
+
+    private UpdateControl<FlinkDeployment> checkDeployment(
+            FlinkDeployment flinkApp, Context context) {
+        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())) {
+            Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+            if (deployment.isPresent()) {
+                DeploymentStatus status = deployment.get().getStatus();
+                DeploymentSpec spec = deployment.get().getSpec();
+                if (status != null
+                        && status.getAvailableReplicas() != null
+                        && spec.getReplicas().intValue() == status.getReplicas()
+                        && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} is ready",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    jobManagerDeployments.add(flinkApp.getMetadata().getSelfLink());

Review comment:
       Do we need to remove the current flinkApp from cache `jobManagerDeployments` when the replicas is not enough? For example, the JobManager crashed for a while.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org