You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/12 12:07:30 UTC

[GitHub] [flink] 1996fanrui opened a new pull request, #21304: [FLINK-30003][rpc] Wait the scheduler future is done before check

1996fanrui opened a new pull request, #21304:
URL: https://github.com/apache/flink/pull/21304

   ## What is the purpose of the change
   
   CI fails due to main thread didn't wait the future is done. The contextClassLoaders may be empty when checking.
   
   https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43092&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&s=ae4f8708-9994-57d3-c2d7-b892156e7812
   
   ![image](https://user-images.githubusercontent.com/38427477/201473174-91bef2a7-df57-4469-bbf2-66b5cfd0fafd.png)
   
   
   ## Brief change log
   
   Wait the scheduler future is done before check
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector:  no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21304: [FLINK-30003][rpc] Wait the scheduler future is done before check

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21304:
URL: https://github.com/apache/flink/pull/21304#discussion_r1021186465


##########
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java:
##########
@@ -150,21 +154,28 @@ void testAkkaRpcService_ScheduleRunnableSetsFlinkContextClassLoader()
     void testAkkaRpcService_ScheduleRunnableWithFixedRateSetsFlinkContextClassLoader() {
         final int numberOfScheduledRuns = 2;
         final List<ClassLoader> contextClassLoaders = new ArrayList<>(numberOfScheduledRuns);
-        akkaRpcService
-                .getScheduledExecutor()
-                .scheduleAtFixedRate(
-                        () -> {
-                            if (contextClassLoaders.size() < numberOfScheduledRuns) {
-                                contextClassLoaders.add(
-                                        Thread.currentThread().getContextClassLoader());
-                            } else {
-                                throw new RuntimeException("cancel task");
-                            }
-                        },
-                        0,
-                        1,
-                        TimeUnit.MILLISECONDS);
-
+        ScheduledFuture<?> future =
+                akkaRpcService
+                        .getScheduledExecutor()
+                        .scheduleAtFixedRate(
+                                () -> {
+                                    if (contextClassLoaders.size() < numberOfScheduledRuns) {
+                                        contextClassLoaders.add(
+                                                Thread.currentThread().getContextClassLoader());
+                                    } else {
+                                        throw new RuntimeException("cancel task");
+                                    }
+                                },
+                                0,
+                                1,
+                                TimeUnit.MILLISECONDS);
+        try {
+            future.get();

Review Comment:
   use assertThatThrownBy



##########
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java:
##########
@@ -150,21 +154,28 @@ void testAkkaRpcService_ScheduleRunnableSetsFlinkContextClassLoader()
     void testAkkaRpcService_ScheduleRunnableWithFixedRateSetsFlinkContextClassLoader() {
         final int numberOfScheduledRuns = 2;
         final List<ClassLoader> contextClassLoaders = new ArrayList<>(numberOfScheduledRuns);
-        akkaRpcService
-                .getScheduledExecutor()
-                .scheduleAtFixedRate(
-                        () -> {
-                            if (contextClassLoaders.size() < numberOfScheduledRuns) {
-                                contextClassLoaders.add(
-                                        Thread.currentThread().getContextClassLoader());
-                            } else {
-                                throw new RuntimeException("cancel task");
-                            }
-                        },
-                        0,
-                        1,
-                        TimeUnit.MILLISECONDS);
-
+        ScheduledFuture<?> future =
+                akkaRpcService
+                        .getScheduledExecutor()
+                        .scheduleAtFixedRate(
+                                () -> {
+                                    if (contextClassLoaders.size() < numberOfScheduledRuns) {
+                                        contextClassLoaders.add(
+                                                Thread.currentThread().getContextClassLoader());
+                                    } else {
+                                        throw new RuntimeException("cancel task");
+                                    }
+                                },
+                                0,
+                                1,
+                                TimeUnit.MILLISECONDS);
+        try {
+            future.get();
+            fail("The future should fail.");

Review Comment:
   It could be more straight-forward to complete a future within `scheduleAtFixedRate` (before the throw the exception) that we wait for outside the executor.



##########
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java:
##########
@@ -173,21 +184,30 @@ void testAkkaRpcService_ScheduleRunnableWithFixedRateSetsFlinkContextClassLoader
     @Test
     void testAkkaRpcService_ScheduleRunnableWithFixedDelaySetsFlinkContextClassLoader() {
         final int numberOfScheduledRuns = 2;
+
         final List<ClassLoader> contextClassLoaders = new ArrayList<>(numberOfScheduledRuns);
-        akkaRpcService
-                .getScheduledExecutor()
-                .scheduleWithFixedDelay(
-                        () -> {
-                            if (contextClassLoaders.size() < numberOfScheduledRuns) {
-                                contextClassLoaders.add(
-                                        Thread.currentThread().getContextClassLoader());
-                            } else {
-                                throw new RuntimeException("cancel task");
-                            }
-                        },
-                        0,
-                        1,
-                        TimeUnit.MILLISECONDS);
+        ScheduledFuture<?> future =
+                akkaRpcService
+                        .getScheduledExecutor()
+                        .scheduleWithFixedDelay(
+                                () -> {
+                                    if (contextClassLoaders.size() < numberOfScheduledRuns) {
+                                        contextClassLoaders.add(
+                                                Thread.currentThread().getContextClassLoader());
+                                    } else {
+                                        throw new RuntimeException("cancel task");
+                                    }
+                                },
+                                0,
+                                1,
+                                TimeUnit.MILLISECONDS);
+        try {
+            future.get();

Review Comment:
   
   
   use assertThatThrownBy
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21304: [FLINK-30003][rpc] Wait the scheduler future is done before check

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21304:
URL: https://github.com/apache/flink/pull/21304#issuecomment-1312537889

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21304: [FLINK-30003][rpc] Wait the scheduler future is done before check

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21304:
URL: https://github.com/apache/flink/pull/21304#discussion_r1022244913


##########
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java:
##########
@@ -157,22 +161,26 @@ void testAkkaRpcService_ScheduleRunnableWithFixedRateSetsFlinkContextClassLoader
                                 contextClassLoaders.add(
                                         Thread.currentThread().getContextClassLoader());
                             } else {
-                                throw new RuntimeException("cancel task");
+                                terminalFuture.complete(null);

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #21304: [FLINK-30003][rpc] Wait the scheduler future is done before check

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #21304:
URL: https://github.com/apache/flink/pull/21304#issuecomment-1313151567

   Hi @zentol , it's caused by FLINK-29249, please help take a look in your free time, thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21304: [FLINK-30003][rpc] Wait the scheduler future is done before check

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21304:
URL: https://github.com/apache/flink/pull/21304#issuecomment-1312465970

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dce74d13ea2cf339d66505b7ba9d95592a3b7131",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dce74d13ea2cf339d66505b7ba9d95592a3b7131",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dce74d13ea2cf339d66505b7ba9d95592a3b7131 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21304: [FLINK-30003][rpc] Wait the scheduler future is done before check

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21304:
URL: https://github.com/apache/flink/pull/21304#discussion_r1021261838


##########
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java:
##########
@@ -150,21 +154,28 @@ void testAkkaRpcService_ScheduleRunnableSetsFlinkContextClassLoader()
     void testAkkaRpcService_ScheduleRunnableWithFixedRateSetsFlinkContextClassLoader() {
         final int numberOfScheduledRuns = 2;
         final List<ClassLoader> contextClassLoaders = new ArrayList<>(numberOfScheduledRuns);
-        akkaRpcService
-                .getScheduledExecutor()
-                .scheduleAtFixedRate(
-                        () -> {
-                            if (contextClassLoaders.size() < numberOfScheduledRuns) {
-                                contextClassLoaders.add(
-                                        Thread.currentThread().getContextClassLoader());
-                            } else {
-                                throw new RuntimeException("cancel task");
-                            }
-                        },
-                        0,
-                        1,
-                        TimeUnit.MILLISECONDS);
-
+        ScheduledFuture<?> future =
+                akkaRpcService
+                        .getScheduledExecutor()
+                        .scheduleAtFixedRate(
+                                () -> {
+                                    if (contextClassLoaders.size() < numberOfScheduledRuns) {
+                                        contextClassLoaders.add(
+                                                Thread.currentThread().getContextClassLoader());
+                                    } else {
+                                        throw new RuntimeException("cancel task");
+                                    }
+                                },
+                                0,
+                                1,
+                                TimeUnit.MILLISECONDS);
+        try {
+            future.get();
+            fail("The future should fail.");

Review Comment:
   Hi @zentol , Thanks for your review, I updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21304: [FLINK-30003][rpc] Wait the scheduler future is done before check

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21304:
URL: https://github.com/apache/flink/pull/21304#discussion_r1021377774


##########
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java:
##########
@@ -157,22 +161,26 @@ void testAkkaRpcService_ScheduleRunnableWithFixedRateSetsFlinkContextClassLoader
                                 contextClassLoaders.add(
                                         Thread.currentThread().getContextClassLoader());
                             } else {
-                                throw new RuntimeException("cancel task");
+                                terminalFuture.complete(null);

Review Comment:
   You should still throw an exception such that the scheduled action stops early.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #21304: [FLINK-30003][rpc] Wait the scheduler future is done before check

Posted by GitBox <gi...@apache.org>.
zentol merged PR #21304:
URL: https://github.com/apache/flink/pull/21304


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org