You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/25 16:37:43 UTC

[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Aitozi commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r906685879


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        if (!reconcileInternal(cr, ctx)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Try to reconcile the current {@link AbstractFlinkResource} to the desired spec.
+     *
+     * @param cr Resource being reconciled.
+     * @param ctx Current context.
+     * @return True if a reconciliation action was triggered, False if no action required.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileInternal(CR cr, Context ctx) throws Exception;
+
+    protected boolean readyToReconcile(CR cr, Context ctx) {
+        return true;
+    }
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Config to be used during deployment.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,
+            SPEC spec,
+            STATUS status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
+            throws Exception;
+
+    /**
+     * Shut down and clean up all Flink job/cluster resources.
+     *
+     * @param resource Resource being reconciled.
+     * @param context Current context.
+     * @return DeleteControl object.
+     */
+    protected abstract DeleteControl cleanupInternal(CR resource, Context context);
+
+    /**
+     * Checks whether the desired spec already matches the currently deployed spec. If they match
+     * the resource status is updated to reflect successful reconciliation.
+     *
+     * @param resource Resource being reconciled.
+     * @param deployConf Deploy configuration for the Flink resource.
+     * @return True if desired spec was already deployed.
+     */
+    protected boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) {
+        AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource);
+        if (resource.getSpec().equals(deployedSpec)) {
+            LOG.info(
+                    "The new spec matches the currently deployed last stable spec. No upgrade needed.");
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource,
+                    deployedSpec.getJob() != null ? deployedSpec.getJob().getState() : null,
+                    deployConf);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether the JobManager Kubernetes Deployment recovery logic should be initiated. This
+     * is triggered only if, jm deployment missing, recovery config and HA enabled.
+     *
+     * @param conf Flink cluster configuration.
+     * @param deployment FlinkDeployment object.
+     * @return True if recovery should be executed.
+     */
+    protected static boolean shouldRecoverDeployment(

Review Comment:
   this seems no need to be static



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        if (!reconcileInternal(cr, ctx)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Try to reconcile the current {@link AbstractFlinkResource} to the desired spec.
+     *
+     * @param cr Resource being reconciled.
+     * @param ctx Current context.
+     * @return True if a reconciliation action was triggered, False if no action required.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileInternal(CR cr, Context ctx) throws Exception;
+
+    protected boolean readyToReconcile(CR cr, Context ctx) {
+        return true;
+    }
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Config to be used during deployment.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,

Review Comment:
   Can these interface directly work with `CR` object, because the meta , spec , status all can be get from it.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkJobReconciler.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/**
+ * Reconciler responsible for handling the job lifecycle according to the desired and current
+ * states.
+ */
+public abstract class FlinkJobReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        extends FlinkReconciler<CR, SPEC, STATUS> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkJobReconciler.class);
+
+    public FlinkJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        super(kubernetesClient, flinkService, configManager, eventRecorder);
+    }
+
+    @Override
+    public boolean reconcileInternal(CR resource, Context context) throws Exception {
+        var deployMeta = resource.getMetadata();
+        STATUS status = resource.getStatus();
+        var reconciliationStatus = status.getReconciliationStatus();
+        SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = resource.getSpec();
+
+        var deployConfig = getDeployConfig(deployMeta, currentDeploySpec, context);
+        if (shouldWaitForPendingSavepoint(status.getJobStatus(), deployConfig)) {
+            LOG.info("Delaying job reconciliation until pending savepoint is completed.");
+            return true;

Review Comment:
   The meaning of the return value is not matched to the comments, actually there is no action triggered. I think the return value have the semantic of whether to quit the remaining reconcile functions 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkJobReconciler.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/**
+ * Reconciler responsible for handling the job lifecycle according to the desired and current
+ * states.
+ */
+public abstract class FlinkJobReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        extends FlinkReconciler<CR, SPEC, STATUS> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkJobReconciler.class);
+
+    public FlinkJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        super(kubernetesClient, flinkService, configManager, eventRecorder);
+    }
+
+    @Override
+    public boolean reconcileInternal(CR resource, Context context) throws Exception {
+        var deployMeta = resource.getMetadata();
+        STATUS status = resource.getStatus();
+        var reconciliationStatus = status.getReconciliationStatus();
+        SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = resource.getSpec();
+
+        var deployConfig = getDeployConfig(deployMeta, currentDeploySpec, context);
+        if (shouldWaitForPendingSavepoint(status.getJobStatus(), deployConfig)) {
+            LOG.info("Delaying job reconciliation until pending savepoint is completed.");
+            return true;
+        }
+
+        var observeConfig = getObserveConfig(resource, context);
+        boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
+        if (specChanged) {
+            if (checkNewSpecAlreadyDeployed(resource, deployConfig)) {
+                return true;
+            }
+            LOG.debug("Detected spec change, starting upgrade process.");
+            JobState currentJobState = lastReconciledSpec.getJob().getState();
+            JobState desiredJobState = currentDeploySpec.getJob().getState();
+            JobState newState = currentJobState;
+            if (currentJobState == JobState.RUNNING) {
+                if (desiredJobState == JobState.RUNNING) {
+                    LOG.info("Upgrading/Restarting running job, suspending first...");
+                }
+                Optional<UpgradeMode> availableUpgradeMode =
+                        getAvailableUpgradeMode(resource, deployConfig, observeConfig);
+                if (availableUpgradeMode.isEmpty()) {
+                    return true;
+                }
+                // We must record the upgrade mode used to the status later
+                currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
+                cancelJob(resource, availableUpgradeMode.get(), observeConfig);
+                newState = JobState.SUSPENDED;
+            }
+            if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
+                restoreJob(
+                        deployMeta,
+                        currentDeploySpec,
+                        status,
+                        deployConfig,
+                        // We decide to enforce HA based on how job was previously suspended
+                        lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
+                newState = JobState.RUNNING;
+            }
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource, newState, deployConfig);
+        } else if (shouldRollBack(reconciliationStatus, observeConfig)) {

Review Comment:
   Is `shouldRollback` and `rollbackApplication` need to be abstract method to let the Application and sessionJob to implement ?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(

Review Comment:
   nit: I think move this to the subclass will be more accurate, although the `ReconciliationUtils.updateForSpecReconciliationSuccess` can handle with the session cluster, but the `FlinkReconciler` do not have to set the JobState



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org