You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/14 12:19:38 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26546] Extract Observer Interface
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 890547f [FLINK-26546] Extract Observer Interface
890547f is described below
commit 890547f063e75f5929d2809e1cfe8895d0d2ac9f
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Sun Mar 13 20:27:07 2022 +0800
[FLINK-26546] Extract Observer Interface
---
.../flink/kubernetes/operator/FlinkOperator.java | 12 +-
.../flink/kubernetes/operator/config/Mode.java | 31 ++++
.../controller/FlinkDeploymentController.java | 14 +-
.../kubernetes/operator/observer/BaseObserver.java | 117 +++++++++++++
.../kubernetes/operator/observer/JobObserver.java | 126 ++++++++++++++
.../kubernetes/operator/observer/Observer.java | 191 ++-------------------
.../ObserverFactory.java} | 42 ++---
.../operator/observer/SessionObserver.java | 40 +++++
.../operator/reconciler/JobReconciler.java | 2 +-
.../operator/reconciler/ReconcilerFactory.java | 12 +-
.../controller/FlinkDeploymentControllerTest.java | 7 +-
.../{ObserverTest.java => JobObserverTest.java} | 48 +-----
.../operator/observer/SessionObserverTest.java | 69 ++++++++
13 files changed, 433 insertions(+), 278 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 7e46b0c..1125b65 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -57,12 +57,10 @@ public class FlinkOperator {
FlinkService flinkService = new FlinkService(client);
FlinkOperatorConfiguration operatorConfiguration =
FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
-
- Observer observer = new Observer(flinkService, operatorConfiguration);
-
FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
- ReconcilerFactory factory =
+ ReconcilerFactory reconcilerFactory =
new ReconcilerFactory(client, flinkService, operatorConfiguration);
+ ObserverFactory observerFactory = new ObserverFactory(flinkService, operatorConfiguration);
FlinkDeploymentController controller =
new FlinkDeploymentController(
@@ -71,8 +69,8 @@ public class FlinkOperator {
client,
namespace,
validator,
- observer,
- factory);
+ reconcilerFactory,
+ observerFactory);
FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller);
controller.setControllerConfig(controllerConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
new file mode 100644
index 0000000..e616432
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+/** The mode of {@link FlinkDeployment}. */
+public enum Mode {
+ APPLICATION,
+ SESSION;
+
+ public static Mode getMode(FlinkDeployment flinkApp) {
+ return flinkApp.getSpec().getJob() != null ? APPLICATION : SESSION;
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 195defd..4ef0959 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -24,7 +24,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -65,8 +65,8 @@ public class FlinkDeploymentController
private final String operatorNamespace;
private final FlinkDeploymentValidator validator;
- private final Observer observer;
private final ReconcilerFactory reconcilerFactory;
+ private final ObserverFactory observerFactory;
private final DefaultConfig defaultConfig;
private final FlinkOperatorConfiguration operatorConfiguration;
@@ -78,15 +78,15 @@ public class FlinkDeploymentController
KubernetesClient kubernetesClient,
String operatorNamespace,
FlinkDeploymentValidator validator,
- Observer observer,
- ReconcilerFactory reconcilerFactory) {
+ ReconcilerFactory reconcilerFactory,
+ ObserverFactory observerFactory) {
this.defaultConfig = defaultConfig;
this.operatorConfiguration = operatorConfiguration;
this.kubernetesClient = kubernetesClient;
this.operatorNamespace = operatorNamespace;
this.validator = validator;
- this.observer = observer;
this.reconcilerFactory = reconcilerFactory;
+ this.observerFactory = observerFactory;
}
@Override
@@ -95,7 +95,7 @@ public class FlinkDeploymentController
Configuration effectiveConfig =
FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
try {
- observer.observe(flinkApp, context, effectiveConfig);
+ observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
} catch (DeploymentFailedException dfe) {
// ignore during cleanup
}
@@ -119,7 +119,7 @@ public class FlinkDeploymentController
FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
try {
- observer.observe(flinkApp, context, effectiveConfig);
+ observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
reconcilerFactory
.getOrCreate(flinkApp)
.reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
new file mode 100644
index 0000000..2dee79e
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/** The base observer. */
+public abstract class BaseObserver implements Observer {
+
+ protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ public static final String JOB_STATE_UNKNOWN = "UNKNOWN";
+ protected final FlinkService flinkService;
+ protected final FlinkOperatorConfiguration operatorConfiguration;
+
+ public BaseObserver(
+ FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) {
+ this.flinkService = flinkService;
+ this.operatorConfiguration = operatorConfiguration;
+ }
+
+ protected void observeJmDeployment(
+ FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+ FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+ JobManagerDeploymentStatus previousJmStatus =
+ deploymentStatus.getJobManagerDeploymentStatus();
+
+ if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+ return;
+ }
+
+ if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+ deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+ return;
+ }
+
+ Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+ if (deployment.isPresent()) {
+ DeploymentStatus status = deployment.get().getStatus();
+ DeploymentSpec spec = deployment.get().getSpec();
+ if (status != null
+ && status.getAvailableReplicas() != null
+ && spec.getReplicas().intValue() == status.getReplicas()
+ && spec.getReplicas().intValue() == status.getAvailableReplicas()
+ && flinkService.isJobManagerPortReady(effectiveConfig)) {
+
+ // typically it takes a few seconds for the REST server to be ready
+ logger.info(
+ "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+ flinkApp.getMetadata().getName(),
+ flinkApp.getMetadata().getNamespace());
+ deploymentStatus.setJobManagerDeploymentStatus(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+ return;
+ }
+ logger.info(
+ "JobManager deployment {} in namespace {} exists but not ready yet, status {}",
+ flinkApp.getMetadata().getName(),
+ flinkApp.getMetadata().getNamespace(),
+ status);
+
+ List<DeploymentCondition> conditions = status.getConditions();
+ for (DeploymentCondition dc : conditions) {
+ if ("FailedCreate".equals(dc.getReason())
+ && "ReplicaFailure".equals(dc.getType())) {
+ // throw only when not already in error status to allow for spec update
+ if (!JobManagerDeploymentStatus.ERROR.equals(
+ deploymentStatus.getJobManagerDeploymentStatus())) {
+ throw new DeploymentFailedException(
+ DeploymentFailedException.COMPONENT_JOBMANAGER, dc);
+ }
+ return;
+ }
+ }
+ deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+ return;
+ }
+
+ deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+ }
+
+ protected boolean isClusterReady(FlinkDeployment dep) {
+ return dep.getStatus().getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.READY;
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
new file mode 100644
index 0000000..ea5a58f
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
+public class JobObserver extends BaseObserver {
+
+ public JobObserver(
+ FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) {
+ super(flinkService, operatorConfiguration);
+ }
+
+ @Override
+ public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+ observeJmDeployment(flinkApp, context, effectiveConfig);
+ if (isClusterReady(flinkApp)) {
+ boolean jobFound = observeFlinkJobStatus(flinkApp, effectiveConfig);
+ if (jobFound) {
+ observeSavepointStatus(flinkApp, effectiveConfig);
+ }
+ }
+ }
+
+ private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+ logger.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
+ FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+
+ Collection<JobStatusMessage> clusterJobStatuses;
+ try {
+ clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+ } catch (Exception e) {
+ logger.error("Exception while listing jobs", e);
+ flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
+ return false;
+ }
+ if (clusterJobStatuses.isEmpty()) {
+ logger.info("No jobs found on {} yet", flinkApp.getMetadata().getName());
+ flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
+ return false;
+ }
+
+ updateJobStatus(flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses));
+ logger.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
+ return true;
+ }
+
+ /** Update previous job status based on the job list from the cluster. */
+ private void updateJobStatus(JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
+ Collections.sort(
+ clusterJobStatuses, (j1, j2) -> Long.compare(j2.getStartTime(), j1.getStartTime()));
+ JobStatusMessage newJob = clusterJobStatuses.get(0);
+
+ status.setState(newJob.getJobState().name());
+ status.setJobName(newJob.getJobName());
+ status.setJobId(newJob.getJobId().toHexString());
+ // track the start time, changing timestamp would cause busy reconciliation
+ status.setUpdateTime(String.valueOf(newJob.getStartTime()));
+ }
+
+ private void observeSavepointStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+ SavepointInfo savepointInfo = flinkApp.getStatus().getJobStatus().getSavepointInfo();
+ if (!SavepointUtils.savepointInProgress(flinkApp)) {
+ logger.debug("Checkpointing not in progress");
+ return;
+ }
+ SavepointFetchResult savepointFetchResult;
+ try {
+ savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp, effectiveConfig);
+ } catch (Exception e) {
+ logger.error("Exception while fetching savepoint info", e);
+ return;
+ }
+
+ if (!savepointFetchResult.isTriggered()) {
+ String error = savepointFetchResult.getError();
+ if (error != null
+ || SavepointUtils.gracePeriodEnded(operatorConfiguration, savepointInfo)) {
+ String errorMsg = error != null ? error : "Savepoint status unknown";
+ logger.error(errorMsg);
+ savepointInfo.resetTrigger();
+ ReconciliationUtils.updateForReconciliationError(flinkApp, errorMsg);
+ return;
+ }
+ logger.info("Savepoint operation not running, waiting within grace period");
+ }
+ if (savepointFetchResult.getSavepoint() == null) {
+ logger.info("Savepoint not completed yet");
+ return;
+ }
+ savepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint());
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index 51ccf75..e354689 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -18,188 +18,19 @@
package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
-import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
-import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.javaoperatorsdk.operator.api.reconciler.Context;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
-/** Observes the actual state of the running jobs on the Flink cluster. */
-public class Observer {
-
- private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
-
- public static final String JOB_STATE_UNKNOWN = "UNKNOWN";
-
- private final FlinkService flinkService;
- private final FlinkOperatorConfiguration operatorConfiguration;
-
- public Observer(FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) {
- this.flinkService = flinkService;
- this.operatorConfiguration = operatorConfiguration;
- }
-
- public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
- observeJmDeployment(flinkApp, context, effectiveConfig);
- if (isApplicationClusterReady(flinkApp)) {
- boolean jobFound = observeFlinkJobStatus(flinkApp, effectiveConfig);
- if (jobFound) {
- observeSavepointStatus(flinkApp, effectiveConfig);
- }
- }
- }
-
- private boolean isApplicationClusterReady(FlinkDeployment dep) {
- return dep.getSpec().getJob() != null
- && dep.getStatus().getJobManagerDeploymentStatus()
- == JobManagerDeploymentStatus.READY;
- }
-
- private void observeJmDeployment(
- FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
- FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
- JobManagerDeploymentStatus previousJmStatus =
- deploymentStatus.getJobManagerDeploymentStatus();
-
- if (JobManagerDeploymentStatus.READY == previousJmStatus) {
- return;
- }
-
- if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
- deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
- return;
- }
-
- Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
- if (deployment.isPresent()) {
- DeploymentStatus status = deployment.get().getStatus();
- DeploymentSpec spec = deployment.get().getSpec();
- if (status != null
- && status.getAvailableReplicas() != null
- && spec.getReplicas().intValue() == status.getReplicas()
- && spec.getReplicas().intValue() == status.getAvailableReplicas()
- && flinkService.isJobManagerPortReady(effectiveConfig)) {
-
- // typically it takes a few seconds for the REST server to be ready
- LOG.info(
- "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace());
- deploymentStatus.setJobManagerDeploymentStatus(
- JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
- return;
- }
- LOG.info(
- "JobManager deployment {} in namespace {} exists but not ready yet, status {}",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace(),
- status);
-
- List<DeploymentCondition> conditions = status.getConditions();
- for (DeploymentCondition dc : conditions) {
- if ("FailedCreate".equals(dc.getReason())
- && "ReplicaFailure".equals(dc.getType())) {
- // throw only when not already in error status to allow for spec update
- if (!JobManagerDeploymentStatus.ERROR.equals(
- deploymentStatus.getJobManagerDeploymentStatus())) {
- throw new DeploymentFailedException(
- DeploymentFailedException.COMPONENT_JOBMANAGER, dc);
- }
- return;
- }
- }
- deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
- return;
- }
-
- deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- }
-
- private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
- LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
- FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
-
- Collection<JobStatusMessage> clusterJobStatuses;
- try {
- clusterJobStatuses = flinkService.listJobs(effectiveConfig);
- } catch (Exception e) {
- LOG.error("Exception while listing jobs", e);
- flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
- return false;
- }
- if (clusterJobStatuses.isEmpty()) {
- LOG.info("No jobs found on {} yet", flinkApp.getMetadata().getName());
- flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
- return false;
- }
-
- updateJobStatus(flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses));
- LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
- return true;
- }
-
- private void observeSavepointStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
- SavepointInfo savepointInfo = flinkApp.getStatus().getJobStatus().getSavepointInfo();
- if (!SavepointUtils.savepointInProgress(flinkApp)) {
- LOG.debug("Checkpointing not in progress");
- return;
- }
- SavepointFetchResult savepointFetchResult;
- try {
- savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp, effectiveConfig);
- } catch (Exception e) {
- LOG.error("Exception while fetching savepoint info", e);
- return;
- }
-
- if (!savepointFetchResult.isTriggered()) {
- String error = savepointFetchResult.getError();
- if (error != null
- || SavepointUtils.gracePeriodEnded(operatorConfiguration, savepointInfo)) {
- String errorMsg = error != null ? error : "Savepoint status unknown";
- LOG.error(errorMsg);
- savepointInfo.resetTrigger();
- ReconciliationUtils.updateForReconciliationError(flinkApp, errorMsg);
- return;
- }
- LOG.info("Savepoint operation not running, waiting within grace period");
- }
- if (savepointFetchResult.getSavepoint() == null) {
- LOG.info("Savepoint not completed yet");
- return;
- }
- savepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint());
- }
-
- /** Update previous job status based on the job list from the cluster. */
- private void updateJobStatus(JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
- Collections.sort(
- clusterJobStatuses, (j1, j2) -> Long.compare(j2.getStartTime(), j1.getStartTime()));
- JobStatusMessage newJob = clusterJobStatuses.get(0);
-
- status.setState(newJob.getJobState().name());
- status.setJobName(newJob.getJobName());
- status.setJobId(newJob.getJobId().toHexString());
- // track the start time, changing timestamp would cause busy reconciliation
- status.setUpdateTime(String.valueOf(newJob.getStartTime()));
- }
+/** The Observer of {@link FlinkDeployment}. */
+public interface Observer {
+
+ /**
+ * Observe the flinkApp status, It will reflect the changed status on the flinkApp resource.
+ *
+ * @param flinkApp the target flinkDeployment resource
+ * @param context the context with which the operation is executed
+ * @param effectiveConfig the effective config of the flinkApp
+ */
+ void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
similarity index 57%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
index 94050ce..d3ec8b9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
@@ -16,59 +16,43 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.service.FlinkService;
-import io.fabric8.kubernetes.client.KubernetesClient;
-
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-/** The factory to create reconciler based on app mode. */
-public class ReconcilerFactory {
+/** The factory to create the observer based ob the {@link FlinkDeployment} mode. */
+public class ObserverFactory {
- private final KubernetesClient kubernetesClient;
private final FlinkService flinkService;
private final FlinkOperatorConfiguration operatorConfiguration;
- private final Map<Mode, Reconciler> reconcilerMap;
+ private final Map<Mode, Observer> observerMap;
- public ReconcilerFactory(
- KubernetesClient kubernetesClient,
- FlinkService flinkService,
- FlinkOperatorConfiguration operatorConfiguration) {
- this.kubernetesClient = kubernetesClient;
+ public ObserverFactory(
+ FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) {
this.flinkService = flinkService;
this.operatorConfiguration = operatorConfiguration;
- this.reconcilerMap = new ConcurrentHashMap<>();
+ this.observerMap = new ConcurrentHashMap<>();
}
- public Reconciler getOrCreate(FlinkDeployment flinkApp) {
- return reconcilerMap.computeIfAbsent(
- getMode(flinkApp),
+ public Observer getOrCreate(FlinkDeployment flinkApp) {
+ return observerMap.computeIfAbsent(
+ Mode.getMode(flinkApp),
mode -> {
switch (mode) {
case SESSION:
- return new SessionReconciler(
- kubernetesClient, flinkService, operatorConfiguration);
+ return new SessionObserver(flinkService, operatorConfiguration);
case APPLICATION:
- return new JobReconciler(
- kubernetesClient, flinkService, operatorConfiguration);
+ return new JobObserver(flinkService, operatorConfiguration);
default:
throw new UnsupportedOperationException(
String.format("Unsupported running mode: %s", mode));
}
});
}
-
- private Mode getMode(FlinkDeployment flinkApp) {
- return flinkApp.getSpec().getJob() != null ? Mode.APPLICATION : Mode.SESSION;
- }
-
- enum Mode {
- APPLICATION,
- SESSION
- }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
new file mode 100644
index 0000000..5cb594c
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+
+/** The observer of the {@link org.apache.flink.kubernetes.operator.config.Mode#SESSION} cluster. */
+public class SessionObserver extends BaseObserver {
+
+ public SessionObserver(
+ FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) {
+ super(flinkService, operatorConfiguration);
+ }
+
+ @Override
+ public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+ observeJmDeployment(flinkApp, context, effectiveConfig);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index a43febd..2d71459 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
import java.util.Optional;
-import static org.apache.flink.kubernetes.operator.observer.Observer.JOB_STATE_UNKNOWN;
+import static org.apache.flink.kubernetes.operator.observer.BaseObserver.JOB_STATE_UNKNOWN;
/**
* Reconciler responsible for handling the job lifecycle according to the desired and current
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
index 94050ce..bdcd8e4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -47,7 +48,7 @@ public class ReconcilerFactory {
public Reconciler getOrCreate(FlinkDeployment flinkApp) {
return reconcilerMap.computeIfAbsent(
- getMode(flinkApp),
+ Mode.getMode(flinkApp),
mode -> {
switch (mode) {
case SESSION:
@@ -62,13 +63,4 @@ public class ReconcilerFactory {
}
});
}
-
- private Mode getMode(FlinkDeployment flinkApp) {
- return flinkApp.getSpec().getJob() != null ? Mode.APPLICATION : Mode.SESSION;
- }
-
- enum Mode {
- APPLICATION,
- SESSION
- }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 658aa18..226186a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -446,7 +446,6 @@ public class FlinkDeploymentControllerTest {
private FlinkDeploymentController createTestController(
KubernetesClient kubernetesClient, TestingFlinkService flinkService) {
- Observer observer = new Observer(flinkService, operatorConfiguration);
FlinkDeploymentController controller =
new FlinkDeploymentController(
@@ -455,9 +454,9 @@ public class FlinkDeploymentControllerTest {
kubernetesClient,
"test",
new DefaultDeploymentValidator(),
- observer,
new ReconcilerFactory(
- kubernetesClient, flinkService, operatorConfiguration));
+ kubernetesClient, flinkService, operatorConfiguration),
+ new ObserverFactory(flinkService, operatorConfiguration));
controller.setControllerConfig(new FlinkControllerConfig(controller));
return controller;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
similarity index 83%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
index ea15298..c3869a4 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -35,48 +34,16 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
-/** @link Observer unit tests */
-public class ObserverTest {
+/** {@link JobObserver} unit tests. */
+public class JobObserverTest {
private final Context readyContext = TestUtils.createContextWithReadyJobManagerDeployment();
@Test
- public void observeSessionCluster() {
- FlinkService flinkService = new TestingFlinkService();
- Observer observer =
- new Observer(
- flinkService,
- FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
- FlinkDeployment deployment = TestUtils.buildSessionCluster();
- deployment
- .getStatus()
- .getReconciliationStatus()
- .setLastReconciledSpec(deployment.getSpec());
-
- observer.observe(
- deployment,
- readyContext,
- FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
-
- assertEquals(
- JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
- deployment.getStatus().getJobManagerDeploymentStatus());
-
- observer.observe(
- deployment,
- readyContext,
- FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
-
- assertEquals(
- JobManagerDeploymentStatus.READY,
- deployment.getStatus().getJobManagerDeploymentStatus());
- }
-
- @Test
public void observeApplicationCluster() {
TestingFlinkService flinkService = new TestingFlinkService();
- Observer observer =
- new Observer(
+ JobObserver observer =
+ new JobObserver(
flinkService,
FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
@@ -136,14 +103,15 @@ public class ObserverTest {
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- assertEquals(Observer.JOB_STATE_UNKNOWN, deployment.getStatus().getJobStatus().getState());
+ assertEquals(
+ BaseObserver.JOB_STATE_UNKNOWN, deployment.getStatus().getJobStatus().getState());
}
@Test
public void observeSavepoint() throws Exception {
TestingFlinkService flinkService = new TestingFlinkService();
- Observer observer =
- new Observer(
+ JobObserver observer =
+ new JobObserver(
flinkService,
FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
new file mode 100644
index 0000000..6e7cb48
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** {@link SessionObserver} unit tests. */
+public class SessionObserverTest {
+ private final Context readyContext = TestUtils.createContextWithReadyJobManagerDeployment();
+
+ @Test
+ public void observeSessionCluster() {
+ FlinkService flinkService = new TestingFlinkService();
+ SessionObserver observer =
+ new SessionObserver(
+ flinkService,
+ FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
+ FlinkDeployment deployment = TestUtils.buildSessionCluster();
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(deployment.getSpec());
+
+ observer.observe(
+ deployment,
+ readyContext,
+ FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
+
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ observer.observe(
+ deployment,
+ readyContext,
+ FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
+
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ }
+}