You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/22 07:30:17 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

gyfora opened a new pull request, #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274

   This PR includes a refactor and unification of the reconciler hieararchy we have right now:
    - SessionReconciler
    - AbstractDeploymentReconciler
      - ApplicationReconciler
      - SessionJobReconciler
    
    The new Hierarchy is:
   - FlinkReconciler
     - FlinkJobReconciler
       - ApplicationReconciler
       - SessionJobReconciler
     - SessionReconciler 
   
   This also completely unifies the logic inside the application and sessionjob management and enables some previously missing features such as rollbacks consistently for sessionjobs too.
   
   In addition it also improves sessionjob test coverage by eliminating some unnecessary mock methods from the TestingFlinkService.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r907153684


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Base class for all Flink resource reconcilers. It contains the general flow of reconciling Flink
+ * related resources including initial deployments, upgrades, rollbacks etc.
+ */
+public abstract class AbstractFlinkResourceReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public AbstractFlinkResourceReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        var spec = cr.getSpec();
+        var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+        var status = cr.getStatus();
+
+        // If the resource is not ready for reconciliation we simply return
+        if (!readyToReconcile(cr, ctx, deployConfig)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var firstDeployment = status.getReconciliationStatus().getLastReconciledSpec() == null;
+
+        // If this is the first deployment for the resource we simply submit the job and return.
+        // No further logic is required at this point.
+        if (firstDeployment) {
+            LOG.info("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        var reconciliationStatus = cr.getStatus().getReconciliationStatus();
+        SPEC lastReconciledSpec =
+                cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = cr.getSpec();
+
+        boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
+        var observeConfig = getObserveConfig(cr, ctx);
+
+        if (specChanged) {
+            if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
+                return;
+            }
+            LOG.info("Reconciling spec change");
+            reconcileSpecChange(cr, observeConfig, deployConfig);
+        } else if (shouldRollBack(reconciliationStatus, observeConfig)) {
+            // Rollbacks are executed in two steps, we initiate it first then return
+            if (initiateRollBack(status)) {
+                return;
+            }
+            LOG.warn("Executing rollback operation");
+            rollback(cr, ctx, observeConfig);
+        } else if (!reconcileOtherChanges(cr, observeConfig)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Deployment configuration.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Get Flink configuration for client interactions with the running Flink deployment/session
+     * job.
+     *
+     * @param resource Related Flink resource.
+     * @param context Reconciliation context.
+     * @return Observe configuration.
+     */
+    protected abstract Configuration getObserveConfig(CR resource, Context context);
+
+    /**
+     * Check whether the given Flink resource is ready to be reconciled or we are still waiting for
+     * any pending operation or condition first.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param deployConfig Deployment configuration.
+     * @return True if the resource is ready to be reconciled.
+     */
+    protected abstract boolean readyToReconcile(CR cr, Context ctx, Configuration deployConfig);
+
+    /**
+     * Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the
+     * status accordingly.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @param deployConfig Deployment configuration.
+     * @throws Exception
+     */
+    protected abstract void reconcileSpecChange(
+            CR cr, Configuration observeConfig, Configuration deployConfig) throws Exception;
+
+    /**
+     * Rollback deployed resource to the last stable spec.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param observeConfig Observe configuration.
+     * @throws Exception
+     */
+    protected abstract void rollback(CR cr, Context ctx, Configuration observeConfig)
+            throws Exception;
+
+    /**
+     * Reconcile any other changes required for this resource that are specific to the reconciler
+     * implementation.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @return True if any further reconciliation action was taken.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileOtherChanges(CR cr, Configuration observeConfig)
+            throws Exception;
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,
+            SPEC spec,
+            STATUS status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
+            throws Exception;
+
+    /**
+     * Shut down and clean up all Flink job/cluster resources.
+     *
+     * @param resource Resource being reconciled.
+     * @param context Current context.
+     * @return DeleteControl object.
+     */
+    protected abstract DeleteControl cleanupInternal(CR resource, Context context);
+
+    /**
+     * Checks whether the desired spec already matches the currently deployed spec. If they match
+     * the resource status is updated to reflect successful reconciliation.
+     *
+     * @param resource Resource being reconciled.
+     * @param deployConf Deploy configuration for the Flink resource.
+     * @return True if desired spec was already deployed.
+     */
+    private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) {
+        AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource);
+        if (resource.getSpec().equals(deployedSpec)) {
+            LOG.info(
+                    "The new spec matches the currently deployed last stable spec. No upgrade needed.");
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource,
+                    deployedSpec.getJob() != null ? deployedSpec.getJob().getState() : null,
+                    deployConf);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether the currently deployed Flink resource spec should be rolled back to the stable
+     * spec. This includes validating the current deployment status, config and checking if the last
+     * reconciled spec did not become stable within the configured grace period.
+     *
+     * <p>Rollbacks are only supported to previously running resource specs with HA enabled.
+     *
+     * @param reconciliationStatus ReconciliationStatus of the resource.
+     * @param configuration Flink cluster configuration.
+     * @return
+     */
+    private boolean shouldRollBack(

Review Comment:
   Maybe I will add a simple logic in this PR to set it stable in the observer if the state is running. And we can add tests in the followup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r907159561


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Base class for all Flink resource reconcilers. It contains the general flow of reconciling Flink
+ * related resources including initial deployments, upgrades, rollbacks etc.
+ */
+public abstract class AbstractFlinkResourceReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public AbstractFlinkResourceReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        var spec = cr.getSpec();
+        var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+        var status = cr.getStatus();
+
+        // If the resource is not ready for reconciliation we simply return
+        if (!readyToReconcile(cr, ctx, deployConfig)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var firstDeployment = status.getReconciliationStatus().getLastReconciledSpec() == null;
+
+        // If this is the first deployment for the resource we simply submit the job and return.
+        // No further logic is required at this point.
+        if (firstDeployment) {
+            LOG.info("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        var reconciliationStatus = cr.getStatus().getReconciliationStatus();
+        SPEC lastReconciledSpec =
+                cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = cr.getSpec();
+
+        boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
+        var observeConfig = getObserveConfig(cr, ctx);
+
+        if (specChanged) {
+            if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
+                return;
+            }
+            LOG.info("Reconciling spec change");
+            reconcileSpecChange(cr, observeConfig, deployConfig);
+        } else if (shouldRollBack(reconciliationStatus, observeConfig)) {
+            // Rollbacks are executed in two steps, we initiate it first then return
+            if (initiateRollBack(status)) {
+                return;
+            }
+            LOG.warn("Executing rollback operation");
+            rollback(cr, ctx, observeConfig);
+        } else if (!reconcileOtherChanges(cr, observeConfig)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Deployment configuration.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Get Flink configuration for client interactions with the running Flink deployment/session
+     * job.
+     *
+     * @param resource Related Flink resource.
+     * @param context Reconciliation context.
+     * @return Observe configuration.
+     */
+    protected abstract Configuration getObserveConfig(CR resource, Context context);
+
+    /**
+     * Check whether the given Flink resource is ready to be reconciled or we are still waiting for
+     * any pending operation or condition first.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param deployConfig Deployment configuration.
+     * @return True if the resource is ready to be reconciled.
+     */
+    protected abstract boolean readyToReconcile(CR cr, Context ctx, Configuration deployConfig);
+
+    /**
+     * Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the
+     * status accordingly.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @param deployConfig Deployment configuration.
+     * @throws Exception
+     */
+    protected abstract void reconcileSpecChange(
+            CR cr, Configuration observeConfig, Configuration deployConfig) throws Exception;
+
+    /**
+     * Rollback deployed resource to the last stable spec.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param observeConfig Observe configuration.
+     * @throws Exception
+     */
+    protected abstract void rollback(CR cr, Context ctx, Configuration observeConfig)
+            throws Exception;
+
+    /**
+     * Reconcile any other changes required for this resource that are specific to the reconciler
+     * implementation.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @return True if any further reconciliation action was taken.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileOtherChanges(CR cr, Configuration observeConfig)
+            throws Exception;
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,
+            SPEC spec,
+            STATUS status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
+            throws Exception;
+
+    /**
+     * Shut down and clean up all Flink job/cluster resources.
+     *
+     * @param resource Resource being reconciled.
+     * @param context Current context.
+     * @return DeleteControl object.
+     */
+    protected abstract DeleteControl cleanupInternal(CR resource, Context context);
+
+    /**
+     * Checks whether the desired spec already matches the currently deployed spec. If they match
+     * the resource status is updated to reflect successful reconciliation.
+     *
+     * @param resource Resource being reconciled.
+     * @param deployConf Deploy configuration for the Flink resource.
+     * @return True if desired spec was already deployed.
+     */
+    private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) {
+        AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource);
+        if (resource.getSpec().equals(deployedSpec)) {
+            LOG.info(
+                    "The new spec matches the currently deployed last stable spec. No upgrade needed.");
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource,
+                    deployedSpec.getJob() != null ? deployedSpec.getJob().getState() : null,
+                    deployConf);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether the currently deployed Flink resource spec should be rolled back to the stable
+     * spec. This includes validating the current deployment status, config and checking if the last
+     * reconciled spec did not become stable within the configured grace period.
+     *
+     * <p>Rollbacks are only supported to previously running resource specs with HA enabled.
+     *
+     * @param reconciliationStatus ReconciliationStatus of the resource.
+     * @param configuration Flink cluster configuration.
+     * @return
+     */
+    private boolean shouldRollBack(

Review Comment:
   +1 for it, I have no other comments, feel free to merge it after this :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r906772181


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        if (!reconcileInternal(cr, ctx)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Try to reconcile the current {@link AbstractFlinkResource} to the desired spec.
+     *
+     * @param cr Resource being reconciled.
+     * @param ctx Current context.
+     * @return True if a reconciliation action was triggered, False if no action required.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileInternal(CR cr, Context ctx) throws Exception;
+
+    protected boolean readyToReconcile(CR cr, Context ctx) {
+        return true;
+    }
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Config to be used during deployment.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,
+            SPEC spec,
+            STATUS status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
+            throws Exception;
+
+    /**
+     * Shut down and clean up all Flink job/cluster resources.
+     *
+     * @param resource Resource being reconciled.
+     * @param context Current context.
+     * @return DeleteControl object.
+     */
+    protected abstract DeleteControl cleanupInternal(CR resource, Context context);
+
+    /**
+     * Checks whether the desired spec already matches the currently deployed spec. If they match
+     * the resource status is updated to reflect successful reconciliation.
+     *
+     * @param resource Resource being reconciled.
+     * @param deployConf Deploy configuration for the Flink resource.
+     * @return True if desired spec was already deployed.
+     */
+    protected boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) {
+        AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource);
+        if (resource.getSpec().equals(deployedSpec)) {
+            LOG.info(
+                    "The new spec matches the currently deployed last stable spec. No upgrade needed.");
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource,
+                    deployedSpec.getJob() != null ? deployedSpec.getJob().getState() : null,
+                    deployConf);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether the JobManager Kubernetes Deployment recovery logic should be initiated. This
+     * is triggered only if, jm deployment missing, recovery config and HA enabled.
+     *
+     * @param conf Flink cluster configuration.
+     * @param deployment FlinkDeployment object.
+     * @return True if recovery should be executed.
+     */
+    protected static boolean shouldRecoverDeployment(

Review Comment:
   but it could easily be static so I think it's better to keep it that way :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r906975589


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        if (!reconcileInternal(cr, ctx)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Try to reconcile the current {@link AbstractFlinkResource} to the desired spec.
+     *
+     * @param cr Resource being reconciled.
+     * @param ctx Current context.
+     * @return True if a reconciliation action was triggered, False if no action required.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileInternal(CR cr, Context ctx) throws Exception;
+
+    protected boolean readyToReconcile(CR cr, Context ctx) {
+        return true;
+    }
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Config to be used during deployment.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,

Review Comment:
   Get it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r907193815


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Base class for all Flink resource reconcilers. It contains the general flow of reconciling Flink
+ * related resources including initial deployments, upgrades, rollbacks etc.
+ */
+public abstract class AbstractFlinkResourceReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public AbstractFlinkResourceReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        var spec = cr.getSpec();
+        var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+        var status = cr.getStatus();
+
+        // If the resource is not ready for reconciliation we simply return
+        if (!readyToReconcile(cr, ctx, deployConfig)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var firstDeployment = status.getReconciliationStatus().getLastReconciledSpec() == null;
+
+        // If this is the first deployment for the resource we simply submit the job and return.
+        // No further logic is required at this point.
+        if (firstDeployment) {
+            LOG.info("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        var reconciliationStatus = cr.getStatus().getReconciliationStatus();
+        SPEC lastReconciledSpec =
+                cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = cr.getSpec();
+
+        boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
+        var observeConfig = getObserveConfig(cr, ctx);
+
+        if (specChanged) {
+            if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
+                return;
+            }
+            LOG.info("Reconciling spec change");
+            reconcileSpecChange(cr, observeConfig, deployConfig);
+        } else if (shouldRollBack(reconciliationStatus, observeConfig)) {
+            // Rollbacks are executed in two steps, we initiate it first then return
+            if (initiateRollBack(status)) {
+                return;
+            }
+            LOG.warn("Executing rollback operation");
+            rollback(cr, ctx, observeConfig);
+        } else if (!reconcileOtherChanges(cr, observeConfig)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Deployment configuration.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Get Flink configuration for client interactions with the running Flink deployment/session
+     * job.
+     *
+     * @param resource Related Flink resource.
+     * @param context Reconciliation context.
+     * @return Observe configuration.
+     */
+    protected abstract Configuration getObserveConfig(CR resource, Context context);
+
+    /**
+     * Check whether the given Flink resource is ready to be reconciled or we are still waiting for
+     * any pending operation or condition first.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param deployConfig Deployment configuration.
+     * @return True if the resource is ready to be reconciled.
+     */
+    protected abstract boolean readyToReconcile(CR cr, Context ctx, Configuration deployConfig);
+
+    /**
+     * Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the
+     * status accordingly.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @param deployConfig Deployment configuration.
+     * @throws Exception
+     */
+    protected abstract void reconcileSpecChange(
+            CR cr, Configuration observeConfig, Configuration deployConfig) throws Exception;
+
+    /**
+     * Rollback deployed resource to the last stable spec.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param observeConfig Observe configuration.
+     * @throws Exception
+     */
+    protected abstract void rollback(CR cr, Context ctx, Configuration observeConfig)
+            throws Exception;
+
+    /**
+     * Reconcile any other changes required for this resource that are specific to the reconciler
+     * implementation.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @return True if any further reconciliation action was taken.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileOtherChanges(CR cr, Configuration observeConfig)
+            throws Exception;
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,
+            SPEC spec,
+            STATUS status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
+            throws Exception;
+
+    /**
+     * Shut down and clean up all Flink job/cluster resources.
+     *
+     * @param resource Resource being reconciled.
+     * @param context Current context.
+     * @return DeleteControl object.
+     */
+    protected abstract DeleteControl cleanupInternal(CR resource, Context context);
+
+    /**
+     * Checks whether the desired spec already matches the currently deployed spec. If they match
+     * the resource status is updated to reflect successful reconciliation.
+     *
+     * @param resource Resource being reconciled.
+     * @param deployConf Deploy configuration for the Flink resource.
+     * @return True if desired spec was already deployed.
+     */
+    private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) {
+        AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource);
+        if (resource.getSpec().equals(deployedSpec)) {
+            LOG.info(
+                    "The new spec matches the currently deployed last stable spec. No upgrade needed.");
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource,
+                    deployedSpec.getJob() != null ? deployedSpec.getJob().getState() : null,
+                    deployConf);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether the currently deployed Flink resource spec should be rolled back to the stable
+     * spec. This includes validating the current deployment status, config and checking if the last
+     * reconciled spec did not become stable within the configured grace period.
+     *
+     * <p>Rollbacks are only supported to previously running resource specs with HA enabled.
+     *
+     * @param reconciliationStatus ReconciliationStatus of the resource.
+     * @param configuration Flink cluster configuration.
+     * @return
+     */
+    private boolean shouldRollBack(

Review Comment:
   @Aitozi added the stability logic for session jobs as well in a separate commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#issuecomment-1166444887

   @Aitozi added some further javadocs to clean up the base reconciler class  and renamed classes for consistency. Please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
Aitozi commented on PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#issuecomment-1163019156

   Thanks for this nice work, the new hierarchy looks good to me, I will take a closer look tonight or tomorrow 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r906772118


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        if (!reconcileInternal(cr, ctx)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Try to reconcile the current {@link AbstractFlinkResource} to the desired spec.
+     *
+     * @param cr Resource being reconciled.
+     * @param ctx Current context.
+     * @return True if a reconciliation action was triggered, False if no action required.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileInternal(CR cr, Context ctx) throws Exception;
+
+    protected boolean readyToReconcile(CR cr, Context ctx) {
+        return true;
+    }
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Config to be used during deployment.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,

Review Comment:
   No because it's not always the current CR.spec that needs to be deployed. Upgrade/rollback etc use different specs at different times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r906685879


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        if (!reconcileInternal(cr, ctx)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Try to reconcile the current {@link AbstractFlinkResource} to the desired spec.
+     *
+     * @param cr Resource being reconciled.
+     * @param ctx Current context.
+     * @return True if a reconciliation action was triggered, False if no action required.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileInternal(CR cr, Context ctx) throws Exception;
+
+    protected boolean readyToReconcile(CR cr, Context ctx) {
+        return true;
+    }
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Config to be used during deployment.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,
+            SPEC spec,
+            STATUS status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
+            throws Exception;
+
+    /**
+     * Shut down and clean up all Flink job/cluster resources.
+     *
+     * @param resource Resource being reconciled.
+     * @param context Current context.
+     * @return DeleteControl object.
+     */
+    protected abstract DeleteControl cleanupInternal(CR resource, Context context);
+
+    /**
+     * Checks whether the desired spec already matches the currently deployed spec. If they match
+     * the resource status is updated to reflect successful reconciliation.
+     *
+     * @param resource Resource being reconciled.
+     * @param deployConf Deploy configuration for the Flink resource.
+     * @return True if desired spec was already deployed.
+     */
+    protected boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) {
+        AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource);
+        if (resource.getSpec().equals(deployedSpec)) {
+            LOG.info(
+                    "The new spec matches the currently deployed last stable spec. No upgrade needed.");
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource,
+                    deployedSpec.getJob() != null ? deployedSpec.getJob().getState() : null,
+                    deployConf);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether the JobManager Kubernetes Deployment recovery logic should be initiated. This
+     * is triggered only if, jm deployment missing, recovery config and HA enabled.
+     *
+     * @param conf Flink cluster configuration.
+     * @param deployment FlinkDeployment object.
+     * @return True if recovery should be executed.
+     */
+    protected static boolean shouldRecoverDeployment(

Review Comment:
   this seems no need to be static



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        if (!reconcileInternal(cr, ctx)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Try to reconcile the current {@link AbstractFlinkResource} to the desired spec.
+     *
+     * @param cr Resource being reconciled.
+     * @param ctx Current context.
+     * @return True if a reconciliation action was triggered, False if no action required.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileInternal(CR cr, Context ctx) throws Exception;
+
+    protected boolean readyToReconcile(CR cr, Context ctx) {
+        return true;
+    }
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Config to be used during deployment.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,

Review Comment:
   Can these interface directly work with `CR` object, because the meta , spec , status all can be get from it.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkJobReconciler.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/**
+ * Reconciler responsible for handling the job lifecycle according to the desired and current
+ * states.
+ */
+public abstract class FlinkJobReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        extends FlinkReconciler<CR, SPEC, STATUS> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkJobReconciler.class);
+
+    public FlinkJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        super(kubernetesClient, flinkService, configManager, eventRecorder);
+    }
+
+    @Override
+    public boolean reconcileInternal(CR resource, Context context) throws Exception {
+        var deployMeta = resource.getMetadata();
+        STATUS status = resource.getStatus();
+        var reconciliationStatus = status.getReconciliationStatus();
+        SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = resource.getSpec();
+
+        var deployConfig = getDeployConfig(deployMeta, currentDeploySpec, context);
+        if (shouldWaitForPendingSavepoint(status.getJobStatus(), deployConfig)) {
+            LOG.info("Delaying job reconciliation until pending savepoint is completed.");
+            return true;

Review Comment:
   The meaning of the return value is not matched to the comments, actually there is no action triggered. I think the return value have the semantic of whether to quit the remaining reconcile functions 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkJobReconciler.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/**
+ * Reconciler responsible for handling the job lifecycle according to the desired and current
+ * states.
+ */
+public abstract class FlinkJobReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        extends FlinkReconciler<CR, SPEC, STATUS> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkJobReconciler.class);
+
+    public FlinkJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        super(kubernetesClient, flinkService, configManager, eventRecorder);
+    }
+
+    @Override
+    public boolean reconcileInternal(CR resource, Context context) throws Exception {
+        var deployMeta = resource.getMetadata();
+        STATUS status = resource.getStatus();
+        var reconciliationStatus = status.getReconciliationStatus();
+        SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = resource.getSpec();
+
+        var deployConfig = getDeployConfig(deployMeta, currentDeploySpec, context);
+        if (shouldWaitForPendingSavepoint(status.getJobStatus(), deployConfig)) {
+            LOG.info("Delaying job reconciliation until pending savepoint is completed.");
+            return true;
+        }
+
+        var observeConfig = getObserveConfig(resource, context);
+        boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
+        if (specChanged) {
+            if (checkNewSpecAlreadyDeployed(resource, deployConfig)) {
+                return true;
+            }
+            LOG.debug("Detected spec change, starting upgrade process.");
+            JobState currentJobState = lastReconciledSpec.getJob().getState();
+            JobState desiredJobState = currentDeploySpec.getJob().getState();
+            JobState newState = currentJobState;
+            if (currentJobState == JobState.RUNNING) {
+                if (desiredJobState == JobState.RUNNING) {
+                    LOG.info("Upgrading/Restarting running job, suspending first...");
+                }
+                Optional<UpgradeMode> availableUpgradeMode =
+                        getAvailableUpgradeMode(resource, deployConfig, observeConfig);
+                if (availableUpgradeMode.isEmpty()) {
+                    return true;
+                }
+                // We must record the upgrade mode used to the status later
+                currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
+                cancelJob(resource, availableUpgradeMode.get(), observeConfig);
+                newState = JobState.SUSPENDED;
+            }
+            if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
+                restoreJob(
+                        deployMeta,
+                        currentDeploySpec,
+                        status,
+                        deployConfig,
+                        // We decide to enforce HA based on how job was previously suspended
+                        lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
+                newState = JobState.RUNNING;
+            }
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource, newState, deployConfig);
+        } else if (shouldRollBack(reconciliationStatus, observeConfig)) {

Review Comment:
   Is `shouldRollback` and `rollbackApplication` need to be abstract method to let the Application and sessionJob to implement ?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(

Review Comment:
   nit: I think move this to the subclass will be more accurate, although the `ReconciliationUtils.updateForSpecReconciliationSuccess` can handle with the session cluster, but the `FlinkReconciler` do not have to set the JobState



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r906991319


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Base class for all Flink resource reconcilers. It contains the general flow of reconciling Flink
+ * related resources including initial deployments, upgrades, rollbacks etc.
+ */
+public abstract class AbstractFlinkResourceReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public AbstractFlinkResourceReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        var spec = cr.getSpec();
+        var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+        var status = cr.getStatus();
+
+        // If the resource is not ready for reconciliation we simply return
+        if (!readyToReconcile(cr, ctx, deployConfig)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var firstDeployment = status.getReconciliationStatus().getLastReconciledSpec() == null;
+
+        // If this is the first deployment for the resource we simply submit the job and return.
+        // No further logic is required at this point.
+        if (firstDeployment) {
+            LOG.info("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        var reconciliationStatus = cr.getStatus().getReconciliationStatus();
+        SPEC lastReconciledSpec =
+                cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = cr.getSpec();
+
+        boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
+        var observeConfig = getObserveConfig(cr, ctx);
+
+        if (specChanged) {
+            if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
+                return;
+            }
+            LOG.info("Reconciling spec change");
+            reconcileSpecChange(cr, observeConfig, deployConfig);
+        } else if (shouldRollBack(reconciliationStatus, observeConfig)) {
+            // Rollbacks are executed in two steps, we initiate it first then return
+            if (initiateRollBack(status)) {
+                return;
+            }
+            LOG.warn("Executing rollback operation");
+            rollback(cr, ctx, observeConfig);
+        } else if (!reconcileOtherChanges(cr, observeConfig)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Deployment configuration.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Get Flink configuration for client interactions with the running Flink deployment/session
+     * job.
+     *
+     * @param resource Related Flink resource.
+     * @param context Reconciliation context.
+     * @return Observe configuration.
+     */
+    protected abstract Configuration getObserveConfig(CR resource, Context context);
+
+    /**
+     * Check whether the given Flink resource is ready to be reconciled or we are still waiting for
+     * any pending operation or condition first.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param deployConfig Deployment configuration.
+     * @return True if the resource is ready to be reconciled.
+     */
+    protected abstract boolean readyToReconcile(CR cr, Context ctx, Configuration deployConfig);
+
+    /**
+     * Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the
+     * status accordingly.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @param deployConfig Deployment configuration.
+     * @throws Exception
+     */
+    protected abstract void reconcileSpecChange(
+            CR cr, Configuration observeConfig, Configuration deployConfig) throws Exception;
+
+    /**
+     * Rollback deployed resource to the last stable spec.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param observeConfig Observe configuration.
+     * @throws Exception
+     */
+    protected abstract void rollback(CR cr, Context ctx, Configuration observeConfig)
+            throws Exception;
+
+    /**
+     * Reconcile any other changes required for this resource that are specific to the reconciler
+     * implementation.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @return True if any further reconciliation action was taken.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileOtherChanges(CR cr, Configuration observeConfig)
+            throws Exception;
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,
+            SPEC spec,
+            STATUS status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
+            throws Exception;
+
+    /**
+     * Shut down and clean up all Flink job/cluster resources.
+     *
+     * @param resource Resource being reconciled.
+     * @param context Current context.
+     * @return DeleteControl object.
+     */
+    protected abstract DeleteControl cleanupInternal(CR resource, Context context);
+
+    /**
+     * Checks whether the desired spec already matches the currently deployed spec. If they match
+     * the resource status is updated to reflect successful reconciliation.
+     *
+     * @param resource Resource being reconciled.
+     * @param deployConf Deploy configuration for the Flink resource.
+     * @return True if desired spec was already deployed.
+     */
+    private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) {
+        AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource);
+        if (resource.getSpec().equals(deployedSpec)) {
+            LOG.info(
+                    "The new spec matches the currently deployed last stable spec. No upgrade needed.");
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource,
+                    deployedSpec.getJob() != null ? deployedSpec.getJob().getState() : null,
+                    deployConf);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether the currently deployed Flink resource spec should be rolled back to the stable
+     * spec. This includes validating the current deployment status, config and checking if the last
+     * reconciled spec did not become stable within the configured grace period.
+     *
+     * <p>Rollbacks are only supported to previously running resource specs with HA enabled.
+     *
+     * @param reconciliationStatus ReconciliationStatus of the resource.
+     * @param configuration Flink cluster configuration.
+     * @return
+     */
+    private boolean shouldRollBack(

Review Comment:
   IIUC, We also enable the rollback feature here for the session job right? But we still do not have a mark stable logic for it now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#issuecomment-1162750385

   cc @Aitozi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#issuecomment-1167041418

   If no more comments @Aitozi I will merge this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#issuecomment-1166364561

   @Aitozi thanks for the review, I have managed to move further methods into the `FlinkReconciler` and make the subclass implementations a little more modular with new methods. `reconcileSpecChange`, `rollback` etc.
   
   I will continue addressing your other comments tomorrow/monday


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r906772317


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(

Review Comment:
   I would like to keep it here to avoid adding extre boilerplate. During initial deployment this is always running anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#issuecomment-1165465785

   Cc @morhidi @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org