You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/18 13:36:47 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26605] Check if JM can serve rest api calls every time before reconcile
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c20228a [FLINK-26605] Check if JM can serve rest api calls every time before reconcile
c20228a is described below
commit c20228a96700a5efe41cd31cb23db1eca20ccd86
Author: Biao Geng <80...@users.noreply.github.com>
AuthorDate: Fri Mar 18 21:32:45 2022 +0800
[FLINK-26605] Check if JM can serve rest api calls every time before reconcile
---
.github/workflows/ci.yml | 4 +--
Dockerfile | 2 +-
.../config/FlinkOperatorConfiguration.java | 5 +++
.../operator/config/OperatorConfigOptions.java | 7 ++++
.../operator/observer/SessionObserver.java | 17 ++++++++-
.../kubernetes/operator/service/FlinkService.java | 6 +++-
.../kubernetes/operator/TestingFlinkService.java | 6 +++-
.../controller/FlinkDeploymentControllerTest.java | 1 +
.../operator/observer/JobObserverTest.java | 16 +++++++++
.../operator/observer/SessionObserverTest.java | 40 +++++++++++++++++-----
10 files changed, 89 insertions(+), 15 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 269010b..d1a86a7 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -52,7 +52,7 @@ jobs:
export SHELL=/bin/bash
export DOCKER_BUILDKIT=1
eval $(minikube -p minikube docker-env)
- docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .
+ docker build --progress=plain --no-cache -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .
docker images
- name: Start the operator
run: |
@@ -107,7 +107,7 @@ jobs:
export SHELL=/bin/bash
export DOCKER_BUILDKIT=1
eval $(minikube -p minikube docker-env)
- docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest .
+ docker build --progress=plain --no-cache -f ./Dockerfile -t flink-kubernetes-operator:ci-latest --progress plain .
docker images
- name: Start the operator
run: |
diff --git a/Dockerfile b/Dockerfile
index 2b6655f..adeaaf3 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -35,7 +35,7 @@ COPY $WEBHOOK_DIR/src ./$WEBHOOK_DIR/src
COPY tools ./tools
-RUN --mount=type=cache,target=/root/.m2 mvn clean install
+RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install
# stage
FROM openjdk:11-jre
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 016e494..4b273f1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -35,6 +35,7 @@ public class FlinkOperatorConfiguration {
Duration progressCheckInterval;
Duration restApiReadyDelay;
Duration savepointTriggerGracePeriod;
+ Duration flinkClientTimeout;
String flinkServiceHostOverride;
Set<String> watchedNamespaces;
@@ -52,6 +53,9 @@ public class FlinkOperatorConfiguration {
operatorConfig.get(
OperatorConfigOptions.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD);
+ Duration flinkClientTimeout =
+ operatorConfig.get(OperatorConfigOptions.OPERATOR_OBSERVER_FLINK_CLIENT_TIMEOUT);
+
String flinkServiceHostOverride = null;
if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) {
// not running in k8s, simplify local development
@@ -65,6 +69,7 @@ public class FlinkOperatorConfiguration {
progressCheckInterval,
restApiReadyDelay,
savepointTriggerGracePeriod,
+ flinkClientTimeout,
flinkServiceHostOverride,
watchedNamespaces);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
index 67f47e7..9fbaca3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
@@ -53,4 +53,11 @@ public class OperatorConfigOptions {
.defaultValue(Duration.ofSeconds(10))
.withDescription(
"The interval before a savepoint trigger attempt is marked as unsuccessful");
+
+ public static final ConfigOption<Duration> OPERATOR_OBSERVER_FLINK_CLIENT_TIMEOUT =
+ ConfigOptions.key("operator.observer.flink.client.timeout.sec")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
+ .withDescription(
+ "The timeout for the observer to wait the flink rest client to return.");
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
index 5cb594c..e98e7e2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
@@ -25,6 +25,8 @@ import org.apache.flink.kubernetes.operator.service.FlinkService;
import io.javaoperatorsdk.operator.api.reconciler.Context;
+import java.util.concurrent.TimeoutException;
+
/** The observer of the {@link org.apache.flink.kubernetes.operator.config.Mode#SESSION} cluster. */
public class SessionObserver extends BaseObserver {
@@ -35,6 +37,19 @@ public class SessionObserver extends BaseObserver {
@Override
public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
- observeJmDeployment(flinkApp, context, effectiveConfig);
+ if (isClusterReady(flinkApp)) {
+ // Check if session cluster can serve rest calls following our practice in JobObserver
+ try {
+ flinkService.listJobs(effectiveConfig);
+ } catch (Exception e) {
+ logger.error("REST service in session cluster is bad now", e);
+ if (e instanceof TimeoutException) {
+ // check for problems with the underlying deployment
+ observeJmDeployment(flinkApp, context, effectiveConfig);
+ }
+ }
+ } else {
+ observeJmDeployment(flinkApp, context, effectiveConfig);
+ }
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c3faf97..568367f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -138,7 +138,11 @@ public class FlinkService {
public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
- return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+ return clusterClient
+ .listJobs()
+ .get(
+ operatorConfiguration.getFlinkClientTimeout().getSeconds(),
+ TimeUnit.SECONDS);
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 5f034d2..d6d6d6d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -37,6 +37,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/** Flink service mock for tests. */
@@ -79,9 +80,12 @@ public class TestingFlinkService extends FlinkService {
@Override
public List<JobStatusMessage> listJobs(Configuration conf) throws Exception {
- if (jobs.isEmpty()) {
+ if (jobs.isEmpty() && !sessions.isEmpty()) {
throw new Exception("Trying to list a job without submitting it");
}
+ if (!isPortReady) {
+ throw new TimeoutException("JM port is unavailable");
+ }
return jobs.stream().map(t -> t.f1).collect(Collectors.toList());
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 6093a8e..24d8f3f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -78,6 +78,7 @@ public class FlinkDeploymentControllerTest {
Duration.ofSeconds(2),
Duration.ofSeconds(3),
Duration.ofSeconds(4),
+ Duration.ofSeconds(5),
null,
Collections.emptySet());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
index 8a90cf2..2db9425 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
@@ -106,6 +106,22 @@ public class JobObserverTest {
.getJobStatus()
.getStartTime()))
>= 0);
+ // Test job manager is unavailable suddenly
+ flinkService.setPortReady(false);
+ observer.observe(deployment, readyContext, conf);
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ // Job manager recovers
+ flinkService.setPortReady(true);
+ observer.observe(deployment, readyContext, conf);
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ observer.observe(deployment, readyContext, conf);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
// Test listing failure
flinkService.clear();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
index 1b79649..2719438 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -48,34 +48,53 @@ public class SessionObserverTest {
@Test
public void observeSessionCluster() {
- FlinkService flinkService = new TestingFlinkService();
+ TestingFlinkService flinkService = new TestingFlinkService();
SessionObserver observer =
new SessionObserver(
flinkService,
FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
FlinkDeployment deployment = TestUtils.buildSessionCluster();
+ Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
deployment
.getStatus()
.getReconciliationStatus()
.setLastReconciledSpec(deployment.getSpec());
- observer.observe(
- deployment,
- readyContext,
- FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- observer.observe(
- deployment,
- readyContext,
- FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
+ observer.observe(deployment, readyContext, conf);
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
+ // Observe again, the JM should be READY
+ observer.observe(deployment, readyContext, conf);
+
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ // Test job manager is unavailable suddenly
+ flinkService.setPortReady(false);
+ observer.observe(deployment, readyContext, conf);
+
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ // Job manager recovers
+ flinkService.setPortReady(true);
+ observer.observe(deployment, readyContext, conf);
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ observer.observe(deployment, readyContext, conf);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
}
@Test
@@ -93,6 +112,7 @@ public class SessionObserverTest {
Duration.ofSeconds(2),
Duration.ofSeconds(3),
Duration.ofSeconds(4),
+ Duration.ofSeconds(5),
null,
Collections.emptySet());
FlinkOperatorConfiguration specificNsConfig =
@@ -101,6 +121,7 @@ public class SessionObserverTest {
Duration.ofSeconds(2),
Duration.ofSeconds(3),
Duration.ofSeconds(4),
+ Duration.ofSeconds(5),
null,
Set.of(deployment.getMetadata().getNamespace()));
FlinkOperatorConfiguration multipleNsConfig =
@@ -109,6 +130,7 @@ public class SessionObserverTest {
Duration.ofSeconds(2),
Duration.ofSeconds(3),
Duration.ofSeconds(4),
+ Duration.ofSeconds(5),
null,
Set.of(deployment.getMetadata().getNamespace(), "ns"));