You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/18 13:36:47 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c20228a  [FLINK-26605] Check if JM can serve rest api calls every time before reconcile
c20228a is described below

commit c20228a96700a5efe41cd31cb23db1eca20ccd86
Author: Biao Geng <80...@users.noreply.github.com>
AuthorDate: Fri Mar 18 21:32:45 2022 +0800

    [FLINK-26605] Check if JM can serve rest api calls every time before reconcile
---
 .github/workflows/ci.yml                           |  4 +--
 Dockerfile                                         |  2 +-
 .../config/FlinkOperatorConfiguration.java         |  5 +++
 .../operator/config/OperatorConfigOptions.java     |  7 ++++
 .../operator/observer/SessionObserver.java         | 17 ++++++++-
 .../kubernetes/operator/service/FlinkService.java  |  6 +++-
 .../kubernetes/operator/TestingFlinkService.java   |  6 +++-
 .../controller/FlinkDeploymentControllerTest.java  |  1 +
 .../operator/observer/JobObserverTest.java         | 16 +++++++++
 .../operator/observer/SessionObserverTest.java     | 40 +++++++++++++++++-----
 10 files changed, 89 insertions(+), 15 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 269010b..d1a86a7 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -52,7 +52,7 @@ jobs:
           export SHELL=/bin/bash
           export DOCKER_BUILDKIT=1
           eval $(minikube -p minikube docker-env)
-          docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .
+          docker build --progress=plain --no-cache -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .
           docker images
       - name: Start the operator
         run: |
@@ -107,7 +107,7 @@ jobs:
           export SHELL=/bin/bash
           export DOCKER_BUILDKIT=1
           eval $(minikube -p minikube docker-env)
-          docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .
+          docker build --progress=plain --no-cache -f ./Dockerfile -t flink-kubernetes-operator:ci-latest --progress plain .
           docker images
       - name: Start the operator
         run: |
diff --git a/Dockerfile b/Dockerfile
index 2b6655f..adeaaf3 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -35,7 +35,7 @@ COPY $WEBHOOK_DIR/src ./$WEBHOOK_DIR/src
 
 COPY tools ./tools
 
-RUN --mount=type=cache,target=/root/.m2 mvn clean install
+RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install
 
 # stage
 FROM openjdk:11-jre
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 016e494..4b273f1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -35,6 +35,7 @@ public class FlinkOperatorConfiguration {
     Duration progressCheckInterval;
     Duration restApiReadyDelay;
     Duration savepointTriggerGracePeriod;
+    Duration flinkClientTimeout;
     String flinkServiceHostOverride;
     Set<String> watchedNamespaces;
 
@@ -52,6 +53,9 @@ public class FlinkOperatorConfiguration {
                 operatorConfig.get(
                         OperatorConfigOptions.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD);
 
+        Duration flinkClientTimeout =
+                operatorConfig.get(OperatorConfigOptions.OPERATOR_OBSERVER_FLINK_CLIENT_TIMEOUT);
+
         String flinkServiceHostOverride = null;
         if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) {
             // not running in k8s, simplify local development
@@ -65,6 +69,7 @@ public class FlinkOperatorConfiguration {
                 progressCheckInterval,
                 restApiReadyDelay,
                 savepointTriggerGracePeriod,
+                flinkClientTimeout,
                 flinkServiceHostOverride,
                 watchedNamespaces);
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
index 67f47e7..9fbaca3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
@@ -53,4 +53,11 @@ public class OperatorConfigOptions {
                     .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "The interval before a savepoint trigger attempt is marked as unsuccessful");
+
+    public static final ConfigOption<Duration> OPERATOR_OBSERVER_FLINK_CLIENT_TIMEOUT =
+            ConfigOptions.key("operator.observer.flink.client.timeout.sec")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription(
+                            "The timeout for the observer to wait the flink rest client to return.");
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
index 5cb594c..e98e7e2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
@@ -25,6 +25,8 @@ import org.apache.flink.kubernetes.operator.service.FlinkService;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 
+import java.util.concurrent.TimeoutException;
+
 /** The observer of the {@link org.apache.flink.kubernetes.operator.config.Mode#SESSION} cluster. */
 public class SessionObserver extends BaseObserver {
 
@@ -35,6 +37,19 @@ public class SessionObserver extends BaseObserver {
 
     @Override
     public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
-        observeJmDeployment(flinkApp, context, effectiveConfig);
+        if (isClusterReady(flinkApp)) {
+            // Check if session cluster can serve rest calls following our practice in JobObserver
+            try {
+                flinkService.listJobs(effectiveConfig);
+            } catch (Exception e) {
+                logger.error("REST service in session cluster is bad now", e);
+                if (e instanceof TimeoutException) {
+                    // check for problems with the underlying deployment
+                    observeJmDeployment(flinkApp, context, effectiveConfig);
+                }
+            }
+        } else {
+            observeJmDeployment(flinkApp, context, effectiveConfig);
+        }
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c3faf97..568367f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -138,7 +138,11 @@ public class FlinkService {
 
     public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            return clusterClient
+                    .listJobs()
+                    .get(
+                            operatorConfiguration.getFlinkClientTimeout().getSeconds(),
+                            TimeUnit.SECONDS);
         }
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 5f034d2..d6d6d6d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -37,6 +37,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 /** Flink service mock for tests. */
@@ -79,9 +80,12 @@ public class TestingFlinkService extends FlinkService {
 
     @Override
     public List<JobStatusMessage> listJobs(Configuration conf) throws Exception {
-        if (jobs.isEmpty()) {
+        if (jobs.isEmpty() && !sessions.isEmpty()) {
             throw new Exception("Trying to list a job without submitting it");
         }
+        if (!isPortReady) {
+            throw new TimeoutException("JM port is unavailable");
+        }
         return jobs.stream().map(t -> t.f1).collect(Collectors.toList());
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 6093a8e..24d8f3f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -78,6 +78,7 @@ public class FlinkDeploymentControllerTest {
                     Duration.ofSeconds(2),
                     Duration.ofSeconds(3),
                     Duration.ofSeconds(4),
+                    Duration.ofSeconds(5),
                     null,
                     Collections.emptySet());
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
index 8a90cf2..2db9425 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
@@ -106,6 +106,22 @@ public class JobObserverTest {
                                                         .getJobStatus()
                                                         .getStartTime()))
                         >= 0);
+        // Test job manager is unavailable suddenly
+        flinkService.setPortReady(false);
+        observer.observe(deployment, readyContext, conf);
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+        // Job manager recovers
+        flinkService.setPortReady(true);
+        observer.observe(deployment, readyContext, conf);
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+        observer.observe(deployment, readyContext, conf);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
 
         // Test listing failure
         flinkService.clear();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
index 1b79649..2719438 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -48,34 +48,53 @@ public class SessionObserverTest {
 
     @Test
     public void observeSessionCluster() {
-        FlinkService flinkService = new TestingFlinkService();
+        TestingFlinkService flinkService = new TestingFlinkService();
         SessionObserver observer =
                 new SessionObserver(
                         flinkService,
                         FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
+        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
         deployment
                 .getStatus()
                 .getReconciliationStatus()
                 .setLastReconciledSpec(deployment.getSpec());
 
-        observer.observe(
-                deployment,
-                readyContext,
-                FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
+        observer.observe(deployment, readyContext, conf);
 
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
-        observer.observe(
-                deployment,
-                readyContext,
-                FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
+        observer.observe(deployment, readyContext, conf);
 
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
+        // Observe again, the JM should be READY
+        observer.observe(deployment, readyContext, conf);
+
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        // Test job manager is unavailable suddenly
+        flinkService.setPortReady(false);
+        observer.observe(deployment, readyContext, conf);
+
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+        // Job manager recovers
+        flinkService.setPortReady(true);
+        observer.observe(deployment, readyContext, conf);
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+        observer.observe(deployment, readyContext, conf);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
     }
 
     @Test
@@ -93,6 +112,7 @@ public class SessionObserverTest {
                         Duration.ofSeconds(2),
                         Duration.ofSeconds(3),
                         Duration.ofSeconds(4),
+                        Duration.ofSeconds(5),
                         null,
                         Collections.emptySet());
         FlinkOperatorConfiguration specificNsConfig =
@@ -101,6 +121,7 @@ public class SessionObserverTest {
                         Duration.ofSeconds(2),
                         Duration.ofSeconds(3),
                         Duration.ofSeconds(4),
+                        Duration.ofSeconds(5),
                         null,
                         Set.of(deployment.getMetadata().getNamespace()));
         FlinkOperatorConfiguration multipleNsConfig =
@@ -109,6 +130,7 @@ public class SessionObserverTest {
                         Duration.ofSeconds(2),
                         Duration.ofSeconds(3),
                         Duration.ofSeconds(4),
+                        Duration.ofSeconds(5),
                         null,
                         Set.of(deployment.getMetadata().getNamespace(), "ns"));