You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/27 09:00:16 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28008] Can not get secondary resource from context after operator restart

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a4e6a5  [FLINK-28008] Can not get secondary resource from context after operator restart
6a4e6a5 is described below

commit 6a4e6a5edb386675fede2790b81e31cdbcb2952c
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Fri Jun 24 10:28:02 2022 +0200

    [FLINK-28008] Can not get secondary resource from context after operator restart
---
 e2e-tests/test_sessionjob_operations.sh            | 13 +++++++++++
 .../flink/kubernetes/operator/FlinkOperator.java   | 13 ++---------
 .../operator/config/FlinkConfigManager.java        |  3 ++-
 .../operator/utils/EventSourceUtils.java           | 26 ++++++++++++++++------
 .../src/main/resources/log4j2.properties           |  3 +++
 pom.xml                                            |  2 +-
 6 files changed, 40 insertions(+), 20 deletions(-)

diff --git a/e2e-tests/test_sessionjob_operations.sh b/e2e-tests/test_sessionjob_operations.sh
index e03604d..a4bbad7 100755
--- a/e2e-tests/test_sessionjob_operations.sh
+++ b/e2e-tests/test_sessionjob_operations.sh
@@ -28,6 +28,7 @@ TIMEOUT=300
 SESSION_CLUSTER_IDENTIFIER="flinkdep/$CLUSTER_ID"
 SESSION_JOB_NAME="flink-example-statemachine"
 SESSION_JOB_IDENTIFIER="sessionjob/$SESSION_JOB_NAME"
+OPERATOR_POD_LABEL="app.kubernetes.io/name=flink-kubernetes-operator"
 
 on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID
 
@@ -65,3 +66,15 @@ assert_available_slots 1 $CLUSTER_ID
 
 echo "Successfully run the sessionjob savepoint upgrade test"
 
+# Test Operator restart
+echo "Delete session job " + $SESSION_JOB_NAME
+kubectl delete flinksessionjob $SESSION_JOB_NAME
+echo "Killing the operator pod"
+kubectl delete pod -l $OPERATOR_POD_LABEL
+echo "Submitting the session job again"
+retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
+
+wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
+wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
+wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 38ed58f..5e93866 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -57,7 +57,6 @@ import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Executors;
 
 /** Main Class for Flink native k8s operator. */
@@ -151,16 +150,8 @@ public class FlinkOperator {
     }
 
     private void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) {
-        // TODO: https://github.com/java-operator-sdk/java-operator-sdk/issues/1259
-        String[] watchedNamespaces =
-                configManager
-                        .getOperatorConfiguration()
-                        .getWatchedNamespaces()
-                        .toArray(String[]::new);
-        String fakeNs = UUID.randomUUID().toString();
-        overrider.settingNamespace(fakeNs);
-        overrider.addingNamespaces(watchedNamespaces);
-        overrider.removingNamespaces(fakeNs);
+        overrider.settingNamespaces(
+                configManager.getOperatorConfiguration().getWatchedNamespaces());
         overrider.withRetry(
                 GenericRetry.fromConfiguration(
                         configManager.getOperatorConfiguration().getRetryConfiguration()));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index 8f67978..0895f29 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -122,9 +122,10 @@ public class FlinkConfigManager {
                 && this.defaultConfig.toMap().equals(newConf.toMap())) {
             LOG.info("Default configuration did not change, nothing to do...");
             return;
+        } else {
+            LOG.info("Setting default configuration to {}", newConf);
         }
 
-        LOG.info("Updating default configuration to {}", newConf);
         var oldNs =
                 Optional.ofNullable(this.operatorConfiguration)
                         .map(FlinkOperatorConfiguration::getWatchedNamespaces)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index 4771723..62bea6d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -27,11 +27,13 @@ import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
 import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
 import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /** Utility class to locate secondary resources. */
@@ -71,7 +73,7 @@ public class EventSourceUtils {
                         FLINK_DEPLOYMENT_IDX,
                         flinkDeployment ->
                                 List.of(
-                                        compositeKey(
+                                        indexKey(
                                                 flinkDeployment.getMetadata().getName(),
                                                 flinkDeployment.getMetadata().getNamespace())));
 
@@ -82,7 +84,7 @@ public class EventSourceUtils {
                                         context.getPrimaryCache()
                                                 .byIndex(
                                                         FLINK_DEPLOYMENT_IDX,
-                                                        compositeKey(
+                                                        indexKey(
                                                                 sessionJob
                                                                         .getSpec()
                                                                         .getDeploymentName(),
@@ -106,7 +108,7 @@ public class EventSourceUtils {
                         FLINK_SESSIONJOB_IDX,
                         sessionJob ->
                                 List.of(
-                                        compositeKey(
+                                        indexKey(
                                                 sessionJob.getSpec().getDeploymentName(),
                                                 sessionJob.getMetadata().getNamespace())));
 
@@ -117,7 +119,7 @@ public class EventSourceUtils {
                                         context.getPrimaryCache()
                                                 .byIndex(
                                                         FLINK_SESSIONJOB_IDX,
-                                                        compositeKey(
+                                                        indexKey(
                                                                 flinkDeployment
                                                                         .getMetadata()
                                                                         .getName(),
@@ -127,14 +129,24 @@ public class EventSourceUtils {
                                                 .stream()
                                                 .map(ResourceID::fromResource)
                                                 .collect(Collectors.toSet()))
+                        .withPrimaryToSecondaryMapper(
+                                (PrimaryToSecondaryMapper<FlinkSessionJob>)
+                                        sessionJob ->
+                                                Set.of(
+                                                        new ResourceID(
+                                                                sessionJob
+                                                                        .getSpec()
+                                                                        .getDeploymentName(),
+                                                                sessionJob
+                                                                        .getMetadata()
+                                                                        .getNamespace())))
                         .withNamespacesInheritedFromController(context)
                         .followNamespaceChanges(true)
                         .build();
-
         return new InformerEventSource<>(configuration, context);
     }
 
-    private static String compositeKey(String name, String namespace) {
-        return String.format("%s_%s", name, namespace);
+    private static String indexKey(String name, String namespace) {
+        return name + "#" + namespace;
     }
 }
diff --git a/flink-kubernetes-operator/src/main/resources/log4j2.properties b/flink-kubernetes-operator/src/main/resources/log4j2.properties
index c3d3c15..f7f24f4 100644
--- a/flink-kubernetes-operator/src/main/resources/log4j2.properties
+++ b/flink-kubernetes-operator/src/main/resources/log4j2.properties
@@ -24,3 +24,6 @@ appender.console.name = ConsoleAppender
 appender.console.type = CONSOLE
 appender.console.layout.type = PatternLayout
 appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] [%X{resource.namespace}.%X{resource.name}] %msg%n%throwable}
+
+logger.conf.name = org.apache.flink.configuration.GlobalConfiguration
+logger.conf.level = WARN
diff --git a/pom.xml b/pom.xml
index 052074e..981815f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,7 @@ under the License.
         <maven-resources-plugin.version>3.2.0</maven-resources-plugin.version>
         <git-commit-id-maven-plugin.version>5.0.0</git-commit-id-maven-plugin.version>
 
-        <operator.sdk.version>3.0.2</operator.sdk.version>
+        <operator.sdk.version>3.0.3</operator.sdk.version>
         <operator.sdk.admission-controller.version>0.2.0</operator.sdk.admission-controller.version>
 
         <fabric8.version>5.12.2</fabric8.version>