You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/07 08:45:53 UTC

[GitHub] [flink-kubernetes-operator] kelemensanyi opened a new pull request #44: [FLINK-26370] Use unbounded ThreadPool for Flink cluster communication

kelemensanyi opened a new pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44


   This PR configures the operator sdk to use an unbounded thread pool for its control loop.
   
   Blocking threads in the reconciler loop for a resource will not hold back the reconciliation of other resources anymore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#discussion_r827749633



##########
File path: helm/flink-operator/values.yaml
##########
@@ -61,6 +61,7 @@ operatorConfiguration:
     metrics.reporter.slf4j.interval: 5 MINUTE
 
     operator.reconciler.reschedule.interval.sec: 15
+    operator.reconciliation.max.parallelism: -1

Review comment:
       we do not need to add this here, it's already in the defaults




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] kelemensanyi commented on a change in pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
kelemensanyi commented on a change in pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#discussion_r827496583



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
##########
@@ -74,12 +88,43 @@ public static void main(String... args) {
                         reconcilerFactory,
                         observerFactory);
 
-        FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller);
+        controllerConfig = new FlinkControllerConfig(controller);
         controller.setControllerConfig(controllerConfig);
         controllerConfig.setConfigurationService(configurationService);
+    }
 
+    public void run() {
         operator.register(controller, controllerConfig);
         operator.installShutdownHook();
         operator.start();
     }
+
+    protected ConfigurationService getConfigurationService() {
+        return new ConfigurationServiceDecorator(DefaultConfigurationService.instance()) {
+            @Override
+            public ExecutorService getExecutorService() {
+                int maxPoolSize = getReconciliationMaxPoolSize();
+                return new ThreadPoolExecutor(
+                        0, maxPoolSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+            }
+
+            private int getReconciliationMaxPoolSize() {
+                int value =
+                        EnvUtils.getPositiveOrMinusOneInt(
+                                EnvUtils.ENV_RECONCILIATION_MAX_PARALLELISM,
+                                Integer.MAX_VALUE,
+                                "Specify a positive number or -1 for infinite parallelism.");

Review comment:
       No worries. :) Fixed now, please have another look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] asfgit closed pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] kelemensanyi commented on a change in pull request #44: [FLINK-26370] Use unbounded ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
kelemensanyi commented on a change in pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#discussion_r826468015



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
##########
@@ -78,12 +88,27 @@ public static void main(String... args) {
                         jobReconciler,
                         sessionReconciler);
 
-        FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller);
+        controllerConfig = new FlinkControllerConfig(controller);
         controller.setControllerConfig(controllerConfig);
         controllerConfig.setConfigurationService(configurationService);
+    }
 
+    public void run() {
         operator.register(controller, controllerConfig);
         operator.installShutdownHook();
         operator.start();
     }
+
+    protected ConfigurationService getConfigurationService() {
+        return new ConfigurationServiceDecorator(DefaultConfigurationService.instance()) {
+            @Override
+            public ExecutorService getExecutorService() {
+                return Executors.newCachedThreadPool();

Review comment:
       The only case I can see when I would set a limit is when I would be getting out of memory. In such a case by setting a limit we could keep the operator running.
   
   I've added a new config option for that: `RECONCILIATION_MAX_PARALLELISM`

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
##########
@@ -27,33 +27,43 @@
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.javaoperatorsdk.ConfigurationServiceDecorator;
 import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.api.config.ConfigurationService;
 import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /** Main Class for Flink native k8s operator. */
 public class FlinkOperator {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
 
-    public static void main(String... args) {
+    final Operator operator;

Review comment:
       Fixed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #44: [FLINK-26370] Use unbounded ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#discussion_r820508224



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
##########
@@ -78,12 +88,27 @@ public static void main(String... args) {
                         jobReconciler,
                         sessionReconciler);
 
-        FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller);
+        controllerConfig = new FlinkControllerConfig(controller);
         controller.setControllerConfig(controllerConfig);
         controllerConfig.setConfigurationService(configurationService);
+    }
 
+    public void run() {
         operator.register(controller, controllerConfig);
         operator.installShutdownHook();
         operator.start();
     }
+
+    protected ConfigurationService getConfigurationService() {
+        return new ConfigurationServiceDecorator(DefaultConfigurationService.instance()) {
+            @Override
+            public ExecutorService getExecutorService() {
+                return Executors.newCachedThreadPool();

Review comment:
       Should this always be unlimited size? Maybe we should make this configurable, what do you think

##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+/** @link FlinkOperator unit tests. */
+public class FlinkOperatorTest {
+
+    @Test
+    public void testExecutorServiceCanRun100ThreadsParallel() throws Exception {

Review comment:
       Seems like the test is testing that cached executor service works that it should. This feels pretty unnecessary, it would be enough I think to assert that the executor service that we get back from the operator is the expected type/config.
   
   Even better would be to actually start the operator and try to submit multiple different deployment (different names) in parallel and see that it handles it.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
##########
@@ -27,33 +27,43 @@
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.javaoperatorsdk.ConfigurationServiceDecorator;
 import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.api.config.ConfigurationService;
 import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /** Main Class for Flink native k8s operator. */
 public class FlinkOperator {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
 
-    public static void main(String... args) {
+    final Operator operator;

Review comment:
       let's keep this private final and add a protected getter with the @VisibleForTesting annotation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] kelemensanyi commented on pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
kelemensanyi commented on pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#issuecomment-1075807670


   Thanks @gyfora for finding out the details about the configuration. It sounds good what you suggested.
   
   I think I implemented that as well, but while testing locally I see the operator restarting sometimes and sometimes it hangs as well. I'm not sure if that's only my local environment, but I have the impression that somehow exceptions don't always get into the logs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#issuecomment-1074943957


   I looked into this a bit more closely. I think the correct solution here is the following:
   
   Remove ConfigurationServiceDecorator and use `ConfigurationServiceOverrider` that does already this, you can see here (https://github.com/apache/flink-kubernetes-operator/pull/91) and here https://github.com/java-operator-sdk/java-operator-sdk/blob/v2.1.1/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java
   
   If the user specifies unbounded threadpool we override the executor service directly like you did now. If the user specifies a bounded number of threads, we only override the number of threads with withConcurrentReconciliationThreads. That will default to `Executors.newFixedThreadPool(concurrentReconciliationThreads());` which seems to work well already. 
   
   Also the default number of threads currently is 5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] kelemensanyi commented on pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
kelemensanyi commented on pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#issuecomment-1075811730


   @gyfora , it might be that only my local environment is causing problems for me, as I've just noticed the operator uses the default `imagePullPolicy` which is `IfNotPresent` and it might not redeploy the latest version of my builds from the registry. I'll give it a try tomorrow by configuring it to `Always`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#discussion_r832925421



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/** @link FlinkOperator unit tests. */
+public class FlinkOperatorTest {
+
+    @Test
+    public void testExecutorServiceUsesReconciliationMaxParallelismFromConfig() {
+        final var defaultConfig = FlinkUtils.loadDefaultConfig();
+        final var operatorConfig =
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
+        final int maxParallelism = operatorConfig.getReconcilerMaxParallelism();
+        final int threadCount = maxParallelism == -1 ? Integer.MAX_VALUE : maxParallelism;
+        System.out.println(threadCount);

Review comment:
       pls remove print

##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/** @link FlinkOperator unit tests. */
+public class FlinkOperatorTest {
+
+    @Test
+    public void testExecutorServiceUsesReconciliationMaxParallelismFromConfig() {
+        final var defaultConfig = FlinkUtils.loadDefaultConfig();
+        final var operatorConfig =

Review comment:
       We can remove the `final` keyword from everywhere in this method it's not really necessary and just adds more code

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
##########
@@ -30,47 +31,55 @@
 import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
 import io.javaoperatorsdk.operator.api.config.ConfigurationService;
 import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
 import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.Executors;
+
 /** Main Class for Flink native k8s operator. */
 public class FlinkOperator {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
 
-    public static void main(String... args) {
+    private final Operator operator;
+    private final FlinkDeploymentController controller;
+    private final FlinkControllerConfig controllerConfig;
+
+    public FlinkOperator() {
 
         LOG.info("Starting Flink Kubernetes Operator");
         DefaultConfig defaultConfig = FlinkUtils.loadDefaultConfig();
         OperatorMetricUtils.initOperatorMetrics(defaultConfig.getOperatorConfig());
 
-        DefaultKubernetesClient client = new DefaultKubernetesClient();
+        KubernetesClient client = new DefaultKubernetesClient();
         String namespace = client.getNamespace();
         if (namespace == null) {
             namespace = "default";
         }
 
-        ConfigurationService configurationService =
-                new ConfigurationServiceOverrider(DefaultConfigurationService.instance())
-                        .checkingCRDAndValidateLocalModel(false)
-                        .build();
-
-        Operator operator = new Operator(client, configurationService);
-
         FlinkOperatorConfiguration operatorConfiguration =
                 FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
 
+        ConfigurationServiceOverrider configOverrider =
+                new ConfigurationServiceOverrider(DefaultConfigurationService.instance())
+                        .checkingCRDAndValidateLocalModel(false);
+        configOverrider = setupExecutorService(configOverrider, operatorConfiguration);
+        ConfigurationService configurationService = configOverrider.build();

Review comment:
       this is getting long we can move it into a dedicated method or one of the util classes and probably merge with `setupExecutorService`

##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/** @link FlinkOperator unit tests. */
+public class FlinkOperatorTest {
+
+    @Test
+    public void testExecutorServiceUsesReconciliationMaxParallelismFromConfig() {
+        final var defaultConfig = FlinkUtils.loadDefaultConfig();

Review comment:
       you should probably create the flink config directly by instantiating a `new Configuration` that will avoid env dependent test behavior and you can easily test the 2 expected behaviour modes here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] kelemensanyi commented on a change in pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
kelemensanyi commented on a change in pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#discussion_r833783330



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/** @link FlinkOperator unit tests. */
+public class FlinkOperatorTest {
+
+    @Test
+    public void testExecutorServiceUsesReconciliationMaxParallelismFromConfig() {
+        final var defaultConfig = FlinkUtils.loadDefaultConfig();

Review comment:
       Good point. Fixed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] kelemensanyi commented on a change in pull request #44: [FLINK-26370] Use unbounded ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
kelemensanyi commented on a change in pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#discussion_r826485650



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+/** @link FlinkOperator unit tests. */
+public class FlinkOperatorTest {
+
+    @Test
+    public void testExecutorServiceCanRun100ThreadsParallel() throws Exception {

Review comment:
       I originally opted for the functional test, because
   - the `ExecutorService` interface does not allow to discover its configuration and
   - the Controller delegated the construction of the executor service to the standard library, which might return another type of class in future releases.
   
   Functional testing is also not an ideal solution, since we can not test if the pool is really unbounded and as you suggested we might do a bit more testing than it is strictly necessary.
   
   I don't have a strong opinion here, so why not do the simplest then: I've changed the tests as you've suggested. 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] kelemensanyi commented on a change in pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
kelemensanyi commented on a change in pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#discussion_r832748931



##########
File path: helm/flink-operator/values.yaml
##########
@@ -61,6 +61,7 @@ operatorConfiguration:
     metrics.reporter.slf4j.interval: 5 MINUTE
 
     operator.reconciler.reschedule.interval.sec: 15
+    operator.reconciliation.max.parallelism: -1

Review comment:
       That's removed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#issuecomment-1068793918


   Looks good @kelemensanyi  thank you! I will test it locally and merge afterwards :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] kelemensanyi commented on pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
kelemensanyi commented on pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#issuecomment-1076897585


   I still couldn't make it work in a stable manner locally. Would be great if you could give it a try @gyfora .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #44: [FLINK-26370] Use unbounded ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#discussion_r826607445



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
##########
@@ -74,12 +88,43 @@ public static void main(String... args) {
                         reconcilerFactory,
                         observerFactory);
 
-        FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller);
+        controllerConfig = new FlinkControllerConfig(controller);
         controller.setControllerConfig(controllerConfig);
         controllerConfig.setConfigurationService(configurationService);
+    }
 
+    public void run() {
         operator.register(controller, controllerConfig);
         operator.installShutdownHook();
         operator.start();
     }
+
+    protected ConfigurationService getConfigurationService() {
+        return new ConfigurationServiceDecorator(DefaultConfigurationService.instance()) {
+            @Override
+            public ExecutorService getExecutorService() {
+                int maxPoolSize = getReconciliationMaxPoolSize();
+                return new ThreadPoolExecutor(
+                        0, maxPoolSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+            }
+
+            private int getReconciliationMaxPoolSize() {
+                int value =
+                        EnvUtils.getPositiveOrMinusOneInt(
+                                EnvUtils.ENV_RECONCILIATION_MAX_PARALLELISM,
+                                Integer.MAX_VALUE,
+                                "Specify a positive number or -1 for infinite parallelism.");

Review comment:
       I think it would be a bit nicer and (a lot simpler) to add this to `OperatorConfigOptions` and expose it through the `FlinkOperatorConfiguration` class instead of the env variable. 
   
   We have some env variable based configs still in a few places but I think for things like this the operator config is much better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] kelemensanyi commented on a change in pull request #44: [FLINK-26370] Use configurable ThreadPool for Flink cluster communication

Posted by GitBox <gi...@apache.org>.
kelemensanyi commented on a change in pull request #44:
URL: https://github.com/apache/flink-kubernetes-operator/pull/44#discussion_r833783023



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
##########
@@ -30,47 +31,55 @@
 import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
 import io.javaoperatorsdk.operator.api.config.ConfigurationService;
 import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
 import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.Executors;
+
 /** Main Class for Flink native k8s operator. */
 public class FlinkOperator {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
 
-    public static void main(String... args) {
+    private final Operator operator;
+    private final FlinkDeploymentController controller;
+    private final FlinkControllerConfig controllerConfig;
+
+    public FlinkOperator() {
 
         LOG.info("Starting Flink Kubernetes Operator");
         DefaultConfig defaultConfig = FlinkUtils.loadDefaultConfig();
         OperatorMetricUtils.initOperatorMetrics(defaultConfig.getOperatorConfig());
 
-        DefaultKubernetesClient client = new DefaultKubernetesClient();
+        KubernetesClient client = new DefaultKubernetesClient();
         String namespace = client.getNamespace();
         if (namespace == null) {
             namespace = "default";
         }
 
-        ConfigurationService configurationService =
-                new ConfigurationServiceOverrider(DefaultConfigurationService.instance())
-                        .checkingCRDAndValidateLocalModel(false)
-                        .build();
-
-        Operator operator = new Operator(client, configurationService);
-
         FlinkOperatorConfiguration operatorConfiguration =
                 FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
 
+        ConfigurationServiceOverrider configOverrider =
+                new ConfigurationServiceOverrider(DefaultConfigurationService.instance())
+                        .checkingCRDAndValidateLocalModel(false);
+        configOverrider = setupExecutorService(configOverrider, operatorConfiguration);
+        ConfigurationService configurationService = configOverrider.build();

Review comment:
       Fixed.

##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/** @link FlinkOperator unit tests. */
+public class FlinkOperatorTest {
+
+    @Test
+    public void testExecutorServiceUsesReconciliationMaxParallelismFromConfig() {
+        final var defaultConfig = FlinkUtils.loadDefaultConfig();
+        final var operatorConfig =

Review comment:
       Fixed.

##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/** @link FlinkOperator unit tests. */
+public class FlinkOperatorTest {
+
+    @Test
+    public void testExecutorServiceUsesReconciliationMaxParallelismFromConfig() {
+        final var defaultConfig = FlinkUtils.loadDefaultConfig();
+        final var operatorConfig =
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
+        final int maxParallelism = operatorConfig.getReconcilerMaxParallelism();
+        final int threadCount = maxParallelism == -1 ? Integer.MAX_VALUE : maxParallelism;
+        System.out.println(threadCount);

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org