You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/11 03:09:30 UTC

[GitHub] [flink] wangyang0918 commented on a change in pull request #18854: [FLINK-25648][Kubernetes] Avoid redundant to query Kubernetes deployment when creating task manager pods

wangyang0918 commented on a change in pull request #18854:
URL: https://github.com/apache/flink/pull/18854#discussion_r824350934



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -130,15 +132,16 @@ public void createJobManagerComponent(KubernetesJobManagerSpecification kubernet
 
     @Override
     public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) {
+        if (masterDeployment == null) {

Review comment:
       The network operation will block the main thread of Flink ResourceManager. And after this change, the `Fabric8FlinkKubeClient` is no longer thead-safe.
   
   So I suggest to use `private final AtomicReference<Deployment> masterDeploymentRef` to store the JM deployment. And we could also run the get-and-update reference on the `kubeClientExecutorService`.
   
   WDYT?
   
   ```
       @Override
       public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) {
           return CompletableFuture.runAsync(
                   () -> {
                       if (masterDeploymentRef.get() == null) {
                           final Deployment masterDeployment =
                                   this.internalClient
                                           .apps()
                                           .deployments()
                                           .withName(KubernetesUtils.getDeploymentName(clusterId))
                                           .get();
   
                           if (masterDeployment == null) {
                               throw new RuntimeException(
                                       "Failed to find Deployment named "
                                               + clusterId
                                               + " in namespace "
                                               + this.namespace);
                           }
   
                           masterDeploymentRef.compareAndSet(null, masterDeployment);
                       }
   
                       // Note that we should use the uid of the master Deployment for the
                       // OwnerReference.
                       setOwnerReference(
                               checkNotNull(masterDeploymentRef.get()),
                               Collections.singletonList(kubernetesPod.getInternalResource()));
   
                       LOG.debug(
                               "Start to create pod with spec {}{}",
                               System.lineSeparator(),
                               KubernetesUtils.tryToGetPrettyPrintYaml(
                                       kubernetesPod.getInternalResource()));
   
                       this.internalClient.pods().create(kubernetesPod.getInternalResource());
                   },
                   kubeClientExecutorService);
       }
   ```

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -130,15 +132,16 @@ public void createJobManagerComponent(KubernetesJobManagerSpecification kubernet
 
     @Override
     public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) {

Review comment:
       We also need to add a test to guard this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org