You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/26 07:46:05 UTC

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #274: [FLINK-28180] Unify Application and SessionJob reconciler logic

gyfora commented on code in PR #274:
URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r906772118


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/FlinkReconciler.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/** Base class for all Flink resource reconcilers. */
+public abstract class FlinkReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public FlinkReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        if (!readyToReconcile(cr, ctx)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var status = cr.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            var spec = cr.getSpec();
+            var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+
+            LOG.debug("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        if (!reconcileInternal(cr, ctx)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Try to reconcile the current {@link AbstractFlinkResource} to the desired spec.
+     *
+     * @param cr Resource being reconciled.
+     * @param ctx Current context.
+     * @return True if a reconciliation action was triggered, False if no action required.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileInternal(CR cr, Context ctx) throws Exception;
+
+    protected boolean readyToReconcile(CR cr, Context ctx) {
+        return true;
+    }
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using {@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Config to be used during deployment.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx);
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session jobs.
+     * @param requireHaMetadata Flag used by application deployments to validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,

Review Comment:
   No because it's not always the current CR.spec that needs to be deployed. Upgrade/rollback etc use different specs at different times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org