You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/12 17:46:12 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #400: [FLINK-29401] Refactor observer structure

gyfora opened a new pull request, #400:
URL: https://github.com/apache/flink-kubernetes-operator/pull/400

   ## What is the purpose of the change
   
   The goal is to create a shared observer hierarchy for all Flink resources and factor out the common bits to base classes in a similar way how it is already done for Reconcilers.
   
   ## Brief change log
   
    - Create AbstractFlinkResourceObserver for shared logic
    - Other refactorings in sessionjob and application observer contexts
   
   ## Verifying this change
   
   
   This change is already covered by existing observer/controller tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no
     - Core observer or reconciler logic that is regularly executed: yes
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #400: [FLINK-29401] Refactor observer structure

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #400:
URL: https://github.com/apache/flink-kubernetes-operator/pull/400


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #400: [FLINK-29401] Refactor observer structure

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #400:
URL: https://github.com/apache/flink-kubernetes-operator/pull/400#issuecomment-1276529737

   cc @morhidi @gaborgsomogyi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #400: [FLINK-29401] Refactor observer structure

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #400:
URL: https://github.com/apache/flink-kubernetes-operator/pull/400#discussion_r995447579


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverContext.java:
##########
@@ -15,9 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.observer.context;
+package org.apache.flink.kubernetes.operator.observer;
 
-/** An empty observer context. */
-public class VoidObserverContext {

Review Comment:
   This is weird indeed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #400: [FLINK-29401] Refactor observer structure

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #400:
URL: https://github.com/apache/flink-kubernetes-operator/pull/400#discussion_r995459259


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverContext.java:
##########
@@ -15,9 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.observer.context;
+package org.apache.flink.kubernetes.operator.observer;
 
-/** An empty observer context. */
-public class VoidObserverContext {

Review Comment:
   It's not that weird to have some context implementations empty in cases where no extra contextual information is necessary. But in any case this is removed now :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #400: [FLINK-29401] Refactor observer structure

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #400:
URL: https://github.com/apache/flink-kubernetes-operator/pull/400#discussion_r994577049


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverContext.java:
##########
@@ -15,9 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.observer.context;
+package org.apache.flink.kubernetes.operator.observer;
 
-/** An empty observer context. */
-public class VoidObserverContext {

Review Comment:
   Just for my own understanding why we've had `VoidObserverContext`? Looks like it doesn't do any meaningful task.
   
   The other thing what I don't understand why we introduce another layer to pass `DeployedConfig`? Having tons of function params would justify the need but seems like it's not the case.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Base observer for all Flink resources. */
+public abstract class AbstractFlinkResourceObserver<
+                CR extends AbstractFlinkResource<?, ?>, CTX extends ObserverContext>
+        implements Observer<CR> {
+
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+
+    public AbstractFlinkResourceObserver(
+            FlinkConfigManager configManager, EventRecorder eventRecorder) {
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void observe(CR resource, Context<?> context) {
+        var observerContext = getObserverContext(resource, context);
+
+        if (!isResourceReadyToBeObserved(resource, context, observerContext)) {
+            return;
+        }
+
+        // Trigger resource specific observe logic
+        observeInternal(resource, context, observerContext);
+
+        SavepointUtils.resetTriggerIfJobNotRunning(resource, eventRecorder);
+    }
+
+    /**
+     * Get the observer context for the current resource.
+     *
+     * @param resource Resource being observed
+     * @param context Resource context
+     * @return Observer context
+     */
+    protected abstract CTX getObserverContext(CR resource, Context<?> context);
+
+    /**
+     * Check whether the resource should be observed. In certain states such as suspended
+     * applications or in-progress upgrades and rollbacks, observing is not necessary.
+     *
+     * @param resource Current resource
+     * @param resourceContext Resource context
+     * @param observerContext Observer context
+     * @return True if we should observe the resource
+     */
+    protected boolean isResourceReadyToBeObserved(
+            CR resource, Context<?> resourceContext, CTX observerContext) {
+        var reconciliationStatus = resource.getStatus().getReconciliationStatus();
+
+        if (reconciliationStatus.isBeforeFirstDeployment()) {
+            logger.debug("Skipping observe before first deployment");
+            return false;
+        }
+
+        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+            logger.debug("Skipping observe during rollback operation");
+            return false;
+        }
+
+        // We are in the middle or possibly right after an upgrade
+        if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {

Review Comment:
   Nit: `reconciliationStatus.getState()` can be put to local var.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Base observer for all Flink resources. */
+public abstract class AbstractFlinkResourceObserver<
+                CR extends AbstractFlinkResource<?, ?>, CTX extends ObserverContext>
+        implements Observer<CR> {
+
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+
+    public AbstractFlinkResourceObserver(
+            FlinkConfigManager configManager, EventRecorder eventRecorder) {
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void observe(CR resource, Context<?> context) {
+        var observerContext = getObserverContext(resource, context);
+
+        if (!isResourceReadyToBeObserved(resource, context, observerContext)) {
+            return;
+        }
+
+        // Trigger resource specific observe logic
+        observeInternal(resource, context, observerContext);
+
+        SavepointUtils.resetTriggerIfJobNotRunning(resource, eventRecorder);
+    }
+
+    /**
+     * Get the observer context for the current resource.
+     *
+     * @param resource Resource being observed
+     * @param context Resource context
+     * @return Observer context
+     */
+    protected abstract CTX getObserverContext(CR resource, Context<?> context);
+
+    /**
+     * Check whether the resource should be observed. In certain states such as suspended
+     * applications or in-progress upgrades and rollbacks, observing is not necessary.
+     *
+     * @param resource Current resource
+     * @param resourceContext Resource context
+     * @param observerContext Observer context
+     * @return True if we should observe the resource
+     */
+    protected boolean isResourceReadyToBeObserved(
+            CR resource, Context<?> resourceContext, CTX observerContext) {
+        var reconciliationStatus = resource.getStatus().getReconciliationStatus();
+
+        if (reconciliationStatus.isBeforeFirstDeployment()) {
+            logger.debug("Skipping observe before first deployment");
+            return false;
+        }
+
+        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+            logger.debug("Skipping observe during rollback operation");
+            return false;
+        }
+
+        // We are in the middle or possibly right after an upgrade
+        if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+            // We must check if the upgrade went through without the status upgrade for some reason
+            checkIfAlreadyUpgraded(resource, resourceContext, observerContext);
+            if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+                ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(resource);
+                logger.debug("Skipping observe before resource is deployed during upgrade");
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Internal observer logic specific to each resource type.
+     *
+     * @param resource Resource to be observed
+     * @param resourceContext Resource context
+     * @param observerContext Observer context
+     */
+    protected abstract void observeInternal(
+            CR resource, Context<?> resourceContext, CTX observerContext);
+
+    /**
+     * Checks a resource that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param resource Flink resource.
+     * @param resourceContext Context for resource.
+     * @param observerContext Context for observer.
+     */
+    protected abstract void checkIfAlreadyUpgraded(

Review Comment:
   Maybe this should be called `updateStatusToDeployedIfAlreadyUpgraded` or something more meaningful. W/o checking the function description I can't really tell what is the outcome of this.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -132,9 +133,9 @@ private void ifRunningMoveToReconciling(JobStatus jobStatus, String previousJobS
     /**
      * Callback when list jobs timeout.
      *
-     * @param ctx Observe context.
+     * @param observerContext Observe context.
      */
-    protected abstract void onTimeout(CTX ctx);
+    protected abstract void onTimeout(R resource, Context<?> resourceContext, CTX observerContext);

Review Comment:
   Nit: `resource` is not documented.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -48,66 +48,72 @@ public ApplicationObserver(
         super(flinkService, configManager, eventRecorder);
         this.savepointObserver =
                 new SavepointObserver<>(flinkService, configManager, eventRecorder);
-        this.jobStatusObserver =

Review Comment:
   Big +1 to collapse this huge inline implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org