You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/27 09:00:16 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28008] Can not get secondary resource from context after operator restart
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 6a4e6a5 [FLINK-28008] Can not get secondary resource from context after operator restart
6a4e6a5 is described below
commit 6a4e6a5edb386675fede2790b81e31cdbcb2952c
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Fri Jun 24 10:28:02 2022 +0200
[FLINK-28008] Can not get secondary resource from context after operator restart
---
e2e-tests/test_sessionjob_operations.sh | 13 +++++++++++
.../flink/kubernetes/operator/FlinkOperator.java | 13 ++---------
.../operator/config/FlinkConfigManager.java | 3 ++-
.../operator/utils/EventSourceUtils.java | 26 ++++++++++++++++------
.../src/main/resources/log4j2.properties | 3 +++
pom.xml | 2 +-
6 files changed, 40 insertions(+), 20 deletions(-)
diff --git a/e2e-tests/test_sessionjob_operations.sh b/e2e-tests/test_sessionjob_operations.sh
index e03604d..a4bbad7 100755
--- a/e2e-tests/test_sessionjob_operations.sh
+++ b/e2e-tests/test_sessionjob_operations.sh
@@ -28,6 +28,7 @@ TIMEOUT=300
SESSION_CLUSTER_IDENTIFIER="flinkdep/$CLUSTER_ID"
SESSION_JOB_NAME="flink-example-statemachine"
SESSION_JOB_IDENTIFIER="sessionjob/$SESSION_JOB_NAME"
+OPERATOR_POD_LABEL="app.kubernetes.io/name=flink-kubernetes-operator"
on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID
@@ -65,3 +66,15 @@ assert_available_slots 1 $CLUSTER_ID
echo "Successfully run the sessionjob savepoint upgrade test"
+# Test Operator restart
+echo "Delete session job " + $SESSION_JOB_NAME
+kubectl delete flinksessionjob $SESSION_JOB_NAME
+echo "Killing the operator pod"
+kubectl delete pod -l $OPERATOR_POD_LABEL
+echo "Submitting the session job again"
+retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
+
+wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
+wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
+wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 38ed58f..5e93866 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -57,7 +57,6 @@ import javax.annotation.Nullable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.Executors;
/** Main Class for Flink native k8s operator. */
@@ -151,16 +150,8 @@ public class FlinkOperator {
}
private void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) {
- // TODO: https://github.com/java-operator-sdk/java-operator-sdk/issues/1259
- String[] watchedNamespaces =
- configManager
- .getOperatorConfiguration()
- .getWatchedNamespaces()
- .toArray(String[]::new);
- String fakeNs = UUID.randomUUID().toString();
- overrider.settingNamespace(fakeNs);
- overrider.addingNamespaces(watchedNamespaces);
- overrider.removingNamespaces(fakeNs);
+ overrider.settingNamespaces(
+ configManager.getOperatorConfiguration().getWatchedNamespaces());
overrider.withRetry(
GenericRetry.fromConfiguration(
configManager.getOperatorConfiguration().getRetryConfiguration()));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index 8f67978..0895f29 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -122,9 +122,10 @@ public class FlinkConfigManager {
&& this.defaultConfig.toMap().equals(newConf.toMap())) {
LOG.info("Default configuration did not change, nothing to do...");
return;
+ } else {
+ LOG.info("Setting default configuration to {}", newConf);
}
- LOG.info("Updating default configuration to {}", newConf);
var oldNs =
Optional.ofNullable(this.operatorConfiguration)
.map(FlinkOperatorConfiguration::getWatchedNamespaces)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index 4771723..62bea6d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -27,11 +27,13 @@ import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/** Utility class to locate secondary resources. */
@@ -71,7 +73,7 @@ public class EventSourceUtils {
FLINK_DEPLOYMENT_IDX,
flinkDeployment ->
List.of(
- compositeKey(
+ indexKey(
flinkDeployment.getMetadata().getName(),
flinkDeployment.getMetadata().getNamespace())));
@@ -82,7 +84,7 @@ public class EventSourceUtils {
context.getPrimaryCache()
.byIndex(
FLINK_DEPLOYMENT_IDX,
- compositeKey(
+ indexKey(
sessionJob
.getSpec()
.getDeploymentName(),
@@ -106,7 +108,7 @@ public class EventSourceUtils {
FLINK_SESSIONJOB_IDX,
sessionJob ->
List.of(
- compositeKey(
+ indexKey(
sessionJob.getSpec().getDeploymentName(),
sessionJob.getMetadata().getNamespace())));
@@ -117,7 +119,7 @@ public class EventSourceUtils {
context.getPrimaryCache()
.byIndex(
FLINK_SESSIONJOB_IDX,
- compositeKey(
+ indexKey(
flinkDeployment
.getMetadata()
.getName(),
@@ -127,14 +129,24 @@ public class EventSourceUtils {
.stream()
.map(ResourceID::fromResource)
.collect(Collectors.toSet()))
+ .withPrimaryToSecondaryMapper(
+ (PrimaryToSecondaryMapper<FlinkSessionJob>)
+ sessionJob ->
+ Set.of(
+ new ResourceID(
+ sessionJob
+ .getSpec()
+ .getDeploymentName(),
+ sessionJob
+ .getMetadata()
+ .getNamespace())))
.withNamespacesInheritedFromController(context)
.followNamespaceChanges(true)
.build();
-
return new InformerEventSource<>(configuration, context);
}
- private static String compositeKey(String name, String namespace) {
- return String.format("%s_%s", name, namespace);
+ private static String indexKey(String name, String namespace) {
+ return name + "#" + namespace;
}
}
diff --git a/flink-kubernetes-operator/src/main/resources/log4j2.properties b/flink-kubernetes-operator/src/main/resources/log4j2.properties
index c3d3c15..f7f24f4 100644
--- a/flink-kubernetes-operator/src/main/resources/log4j2.properties
+++ b/flink-kubernetes-operator/src/main/resources/log4j2.properties
@@ -24,3 +24,6 @@ appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] [%X{resource.namespace}.%X{resource.name}] %msg%n%throwable}
+
+logger.conf.name = org.apache.flink.configuration.GlobalConfiguration
+logger.conf.level = WARN
diff --git a/pom.xml b/pom.xml
index 052074e..981815f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,7 @@ under the License.
<maven-resources-plugin.version>3.2.0</maven-resources-plugin.version>
<git-commit-id-maven-plugin.version>5.0.0</git-commit-id-maven-plugin.version>
- <operator.sdk.version>3.0.2</operator.sdk.version>
+ <operator.sdk.version>3.0.3</operator.sdk.version>
<operator.sdk.admission-controller.version>0.2.0</operator.sdk.admission-controller.version>
<fabric8.version>5.12.2</fabric8.version>