You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/19 09:43:35 UTC

[GitHub] [flink] Mrart opened a new pull request, #21527: [FLINK-27925] watch tm pod performance optimization

Mrart opened a new pull request, #21527:
URL: https://github.com/apache/flink/pull/21527

   <!--
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   In the case of large-scale start and stop jobs, constantly reading data from etcd can cause a bottleneck in etcd performance.  We can increase resourceversion=0 in watch to reduce data read from etcd.
   
   
   ## Brief change log
   
     - *  Add  withResourceVersion("0") in Fabric8FlinkKubeClient#watchPodsAndDoCallback
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no)
     - The serializers: (no / don't)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Mrart commented on pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "Mrart (via GitHub)" <gi...@apache.org>.
Mrart commented on PR #21527:
URL: https://github.com/apache/flink/pull/21527#issuecomment-1585870257

   > @Mrart It seems that the e2e test case is failed, can you rebase on latest `master` branch to see if it is fixed.
   
   Fixed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Mrart commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "Mrart (via GitHub)" <gi...@apache.org>.
Mrart commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1192918839


##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(1000)

Review Comment:
   I have test, It at most 1 second.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Mrart commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "Mrart (via GitHub)" <gi...@apache.org>.
Mrart commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1191070958


##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(1000)

Review Comment:
   definitely wait for 1 second 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wangyang0918 commented on pull request #21527: [FLINK-27925] watch tm pod performance optimization

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on PR #21527:
URL: https://github.com/apache/flink/pull/21527#issuecomment-1357635178

   We need to add a test to guard this behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1211033434


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java:
##########
@@ -115,4 +115,9 @@ public class Constants {
     public static final String KUBERNETES_TASK_MANAGER_SCRIPT_PATH = "kubernetes-taskmanager.sh";
 
     public static final String ENV_TM_JVM_MEM_OPTS = "FLINK_TM_JVM_MEM_OPTS";
+
+    // "resourceVersion="0" is means "Any".  Return data at any resource version.It saves time to

Review Comment:
   ```suggestion
       // "resourceVersion="0" means any resource version. It saves time to
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java:
##########
@@ -65,7 +69,13 @@ void testClosingWithException() {
     void testCallbackHandler() {
         FlinkPod pod = new FlinkPod.Builder().build();
         final KubernetesPodsWatcher podsWatcher =
-                new KubernetesPodsWatcher(new TestingCallbackHandler(e -> {}));
+                new KubernetesPodsWatcher(
+                        TestingWatchCallbackHandler.<KubernetesPod>builder()
+                                .setOnAddedConsumer(pods -> podAddedList.addAll(pods))
+                                .setOnModifiedConsumer(pods -> podModifiedList.addAll(pods))
+                                .setOnDeletedConsumer(pods -> podDeletedList.addAll(pods))
+                                .setOnErrorConsumer(pods -> podErrorList.addAll(pods))

Review Comment:
   It seems that these 4 list is only used in this test case. I'd suggestion moving their declaration to this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #21527:
URL: https://github.com/apache/flink/pull/21527#issuecomment-1580742874

   @Mrart It seems that the e2e test case is failed, can you rebase on latest `master` branch to see if it is fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #21527:
URL: https://github.com/apache/flink/pull/21527#issuecomment-1566618712

   It seems that some conflicts need to be resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1160572180


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String clusterId) {
 
     @Override
     public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
-        final List<Pod> podList = this.internalClient.pods().withLabels(labels).list().getItems();
+        final List<Pod> podList =
+                this.internalClient
+                        .pods()
+                        .withLabels(labels)
+                        .list(new ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   This "0" could be extracted as a static final variable



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =

Review Comment:
   maybe "pod" is enough, since there is only one pod here



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {

Review Comment:
   It's better to move the namespace, name, resource version to function arguments.
   



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -277,6 +281,23 @@ void testStopPod() throws ExecutionException, InterruptedException {
         assertThat(this.kubeClient.pods().inNamespace(NAMESPACE).withName(podName).get()).isNull();
     }
 
+    @Test
+    void testGetPodsWithLabels() {
+        final String podName = "pod-with-labels";
+        final Pod pod =
+                new PodBuilder()
+                        .editOrNewMetadata()
+                        .withName(podName)
+                        .withLabels(TESTING_LABELS)
+                        .endMetadata()
+                        .editOrNewSpec()
+                        .endSpec()
+                        .build();
+        this.kubeClient.pods().inNamespace(NAMESPACE).create(pod);
+        List<KubernetesPod> kubernetesPods = this.flinkKubeClient.getPodsWithLabels(TESTING_LABELS);
+        assertThat(kubernetesPods.size()).isEqualTo(1);

Review Comment:
   Should check the list pod is excepted one
   
           assertThat(kubernetesPods)
                   .satisfiesExactly(
                           kubernetesPod -> assertThat(kubernetesPod.getName()).isEqualTo(podName));



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
                 .isEmpty();
     }
 
+    @Test
+    void testWatchPodsAndDoCallback() throws Exception {
+        mockPodEventWithLabels(TESTING_LABELS);
+        // the count latch for events.
+        final CountDownLatch eventLatch = new CountDownLatch(4);
+        this.flinkKubeClient.watchPodsAndDoCallback(

Review Comment:
   It's better to check each event is received.
   
   Maybe you can use three CountDownLatch and a anonymous classes here 



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -630,4 +661,39 @@ private KubernetesConfigMap buildTestingConfigMap() {
                         .withData(data)
                         .build());
     }
+
+    private class TestingKubernetesPodCallbackHandler
+            implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {
+
+        private final CountDownLatch eventLatch;
+
+        public TestingKubernetesPodCallbackHandler(CountDownLatch eventLatch) {
+            this.eventLatch = eventLatch;
+        }
+
+        @Override
+        public void onAdded(List<KubernetesPod> resources) {
+            this.eventLatch.countDown();
+        }
+
+        @Override
+        public void onModified(List<KubernetesPod> resources) {
+            this.eventLatch.countDown();
+        }
+
+        @Override
+        public void onDeleted(List<KubernetesPod> resources) {
+            this.eventLatch.countDown();
+        }
+
+        @Override
+        public void onError(List<KubernetesPod> resources) {

Review Comment:
   ERROR event will not trigger Watcher#eventReceived, so this unit test will failed.
   
   Maybe we can just skip this in unit test.
   
   ref: https://github.com/fabric8io/kubernetes-client/blob/master/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java#L325



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")

Review Comment:
   This path can be formatted using String.format.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(1000)

Review Comment:
   1000 is a bit long



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
                 .isEmpty();
     }
 
+    @Test
+    void testWatchPodsAndDoCallback() throws Exception {
+        mockPodEventWithLabels(TESTING_LABELS);
+        // the count latch for events.
+        final CountDownLatch eventLatch = new CountDownLatch(4);
+        this.flinkKubeClient.watchPodsAndDoCallback(
+                TESTING_LABELS, new TestingKubernetesPodCallbackHandler(eventLatch));
+        assertTrue(eventLatch.await(10, TimeUnit.SECONDS));

Review Comment:
   We need use junit5 and Assertj in unit tests. 
   
   assertThat(eventLatch.await(1000, TimeUnit.MILLISECONDS)).isTrue();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Mrart commented on pull request #21527: [FLINK-27925] watch tm pod performance optimization

Posted by GitBox <gi...@apache.org>.
Mrart commented on PR #21527:
URL: https://github.com/apache/flink/pull/21527#issuecomment-1368790057

   @wangyang0918  Added a unit test to test that we can use withResourceversion("0 ") to listen on pods events when pod resourceversion=5668.5668 is a random resource version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Mrart commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "Mrart (via GitHub)" <gi...@apache.org>.
Mrart commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1191067760


##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(1000)

Review Comment:
   The waitFor parameter specifies the amount of time the program should wait for the expected output to appear before timing out. If the expected output does not appear within the specified time, the program will raise a timeout error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa merged pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa merged PR #21527:
URL: https://github.com/apache/flink/pull/21527


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21527: [FLINK-27925] watch tm pod performance optimization

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21527:
URL: https://github.com/apache/flink/pull/21527#issuecomment-1357371802

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d72ad3c7103db9caf25491cca4df983c267c01e2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d72ad3c7103db9caf25491cca4df983c267c01e2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d72ad3c7103db9caf25491cca4df983c267c01e2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Mrart commented on pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by GitBox <gi...@apache.org>.
Mrart commented on PR #21527:
URL: https://github.com/apache/flink/pull/21527#issuecomment-1373105538

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Mrart commented on pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "Mrart (via GitHub)" <gi...@apache.org>.
Mrart commented on PR #21527:
URL: https://github.com/apache/flink/pull/21527#issuecomment-1568348172

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1160799424


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String clusterId) {
 
     @Override
     public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
-        final List<Pod> podList = this.internalClient.pods().withLabels(labels).list().getItems();
+        final List<Pod> podList =
+                this.internalClient
+                        .pods()
+                        .withLabels(labels)
+                        .list(new ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   ```suggestion
                           .list(new ListOptionsBuilder().withResourceVersion("0").build())
   ```
   We'd better add some comments for this magic number.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -233,6 +239,7 @@ public KubernetesWatch watchPodsAndDoCallback(
                                                         this.internalClient
                                                                 .pods()
                                                                 .withLabels(labels)
+                                                                .withResourceVersion("0")

Review Comment:
   Refer to previous comments.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(1000)

Review Comment:
   I'm not very familiar with this api. Can anyone tell me if this means we will definitely wait for `1` second or at most `1` second. If it is the former, it should definitely be prohibited because it will seriously slow down the execution time of `AZP`.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
                 .isEmpty();
     }
 
+    @Test
+    void testWatchPodsAndDoCallback() throws Exception {
+        mockPodEventWithLabels(TESTING_LABELS);
+        // the count latch for events.
+        final CountDownLatch eventLatch = new CountDownLatch(4);
+        this.flinkKubeClient.watchPodsAndDoCallback(

Review Comment:
   It seems that we have too many class implements `FlinkKubeClient.WatchCallbackHandler<T>` only for testing purpose. We'd better introduce a more general and reusable `TestingKubernetesPodCallbackHandler` as the first commit and rewrite all other impls like `NoOpWatchCallbackHandler` & `TestingCallbackHandler`. 
   
   Don't worry too much, if you don't know how to do this refactor, I can push this commit as example.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String clusterId) {
 
     @Override
     public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
-        final List<Pod> podList = this.internalClient.pods().withLabels(labels).list().getItems();
+        final List<Pod> podList =
+                this.internalClient
+                        .pods()
+                        .withLabels(labels)
+                        .list(new ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   +1 for this.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(1000)

Review Comment:
   This time also should be extracted to a constant value.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -68,9 +70,11 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.fail;
+import static org.junit.Assert.assertTrue;
 
 /** Tests for Fabric implementation of {@link FlinkKubeClient}. */
 public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
+

Review Comment:
   Why add this new line?



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
                 .isEmpty();
     }
 
+    @Test
+    void testWatchPodsAndDoCallback() throws Exception {
+        mockPodEventWithLabels(TESTING_LABELS);
+        // the count latch for events.
+        final CountDownLatch eventLatch = new CountDownLatch(4);
+        this.flinkKubeClient.watchPodsAndDoCallback(

Review Comment:
   I agreed that we should also check all received events. IMH, We can introduce three `CompletableFuture<Watch.Action>` to handle this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org