You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/08 20:00:24 UTC

[GitHub] [flink] reswqa opened a new pull request, #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

reswqa opened a new pull request, #20229:
URL: https://github.com/apache/flink/pull/20229

   ## What is the purpose of the change
   
   Let `CheckedThread` supports pass a `RunnableWithException` to constructor, this will make it behave more like a thread.
   After this pr, we can use `CheckedThread` as the same way of using `Thread` like: `new Thread(Runnable runnable)` or
   `new Thread()` and then override `Thread.run()` method. Just change it to `CheckedThread(RunnableWithException runnableWithException)` or `new CheckedThread()` and then override `CheckedThread.go()` method.
   
   
   ## Brief change log
   
     - *Let `CheckedThread` supports pass a `RunnableWithException` to constructor*
     - *All anonymous inner class based on it are replaced by lambda expressions constructor*
     - *Add test for CheckedThread*
   
   
   ## Verifying this change
   
   This change added tests and can be verified by `CheckedThreadTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r918116847


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   In addition, I still want to know the clear benefits of removing the override go method. In my mind, it is more reasonable for us to use `CheckedThread` like using the `Thread`. When using thread, we usually build it in two ways: one is pass the runnable object to constructor, and the other is to override its run method. So the only difference is that the `run` method is replaced by the `go` method. Previously, `CheckedThread` did not support constructed by runnable, but now we can complete this function through this pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r917869284


##########
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java:
##########
@@ -142,18 +142,16 @@ public void testStreaming() throws Exception {
         submitJobAndVerifyResults(jobGraph);
     }
 
-    private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {
+    private void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {

Review Comment:
   Here is to use `getClass().getClassLoader()`, perhaps it is more appropriate to use `AccumulatorLiveITCase.class.getClassLoader()` or `CheckedThread.class.getClassLoader()` directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r920262106


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   After reading your opinion, I now think it is appropriate to remove the `go` method, so that our class will become very simple and easy to use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on PR #20229:
URL: https://github.com/apache/flink/pull/20229#issuecomment-1183428549

   @zentol Thank you for your excellent suggestions and patient review. According to your comments, I updated this pull request, PTAL~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r918081449


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   Nice idea, I tried to get rid of the support for overriding the go method, and then found that a more difficult class might not be easy to deal with. It is `org apache. flink. streaming. runtime. io. benchmark. ReceiverThread`,  do you have any suggestions?



##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   Nice idea, But I tried to get rid of the support for overriding the go method, and then found that a more difficult class might not be easy to deal with. It is `org apache. flink. streaming. runtime. io. benchmark. ReceiverThread`,  do you have any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r920140769


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   There shouldn't be 2 ways to achieve the same thing; it just makes the class more complicated. As is you can create a CheckedThread without implementing `go()` nor providing a `Runnable` (or the exact opposite, doing both!), making it unclear how you are even supposed to implement it
   
   `Thread#run` possibly exists because at the time it was added passing a `Runnable` via the constructor just wasn't possible. `run()/go()` have always been problematic because they are methods visible to the user, despite the user not being supposed to ever call them.
   
   ReceiverThread is easy to migrate. Just encapsulate all the logic from the thread into a separate class, and have the `Runnable` refer to that. (See below).
   
   ```
   public class ReceiverThread extends CheckedThread {
       protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class);
   
       private final State state;
   
       protected ReceiverThread(State state) {
           super(
                   () -> {
                       try {
                           while (state.running) {
                               state.readRecords(state.getExpectedRecord().get());
                               state.finishProcessingExpectedRecords();
                           }
                       } catch (InterruptedException e) {
                           if (state.running) {
                               throw e;
                           }
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
                   });
           setName(this.getClass().getName());
           this.state = state;
       }
   
       public synchronized CompletableFuture<?> setExpectedRecord(long record) {
           return state.setExpectedRecord(record);
       }
   
       public void shutdown() {
           state.running = false;
           interrupt();
           state.expectedRecord.complete(0L);
       }
   
       protected abstract static class State {
           protected final int expectedRepetitionsOfExpectedRecord;
   
           protected int expectedRecordCounter;
           protected CompletableFuture<Long> expectedRecord = new CompletableFuture<>();
           protected CompletableFuture<?> recordsProcessed = new CompletableFuture<>();
   
           protected volatile boolean running;
   
           protected State(int expectedRepetitionsOfExpectedRecord) {
               this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord;
           }
   
           public synchronized CompletableFuture<?> setExpectedRecord(long record) {
               checkState(!expectedRecord.isDone());
               checkState(!recordsProcessed.isDone());
               expectedRecord.complete(record);
               expectedRecordCounter = 0;
               return recordsProcessed;
           }
   
           private synchronized CompletableFuture<Long> getExpectedRecord() {
               return expectedRecord;
           }
   
           protected abstract void readRecords(long lastExpectedRecord) throws Exception;
   
           private synchronized void finishProcessingExpectedRecords() {
               checkState(expectedRecord.isDone());
               checkState(!recordsProcessed.isDone());
   
               recordsProcessed.complete(null);
               expectedRecord = new CompletableFuture<>();
               recordsProcessed = new CompletableFuture<>();
           }
       }
   }
   ```
   
   ```
   public class SerializingLongReceiver extends ReceiverThread {
   
       private SerializingLongReceiver(State state) {
           super(state);
       }
   
       public static SerializingLongReceiver create(
               InputGate inputGate, int expectedRepetitionsOfExpectedRecord) {
           return new SerializingLongReceiver(
                   new State(inputGate, expectedRepetitionsOfExpectedRecord));
       }
   
       private static class State extends ReceiverThread.State {
   
           private final MutableRecordReader<LongValue> reader;
   
           private State(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) {
               super(expectedRepetitionsOfExpectedRecord);
               this.reader =
                       new MutableRecordReader<>(
                               inputGate,
                               new String[] {EnvironmentInformation.getTemporaryFileDirectory()});
           }
   
           @Override
           protected void readRecords(long lastExpectedRecord) throws Exception {
               LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
               final LongValue value = new LongValue();
   
               while (running && reader.next(value)) {
                   final long ts = value.getValue();
                   if (ts == lastExpectedRecord) {
                       expectedRecordCounter++;
                       if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) {
                           break;
                       }
                   }
               }
           }
       }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r918081449


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   Nice idea, But I tried to get rid of the support for overriding the go method, and then found that a more difficult class might not be easy to deal with. It is `org apache.flink.streaming. runtime.io.benchmark.ReceiverThread`,  do you have any suggestions?



##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   Nice idea, But I tried to get rid of the support for overriding the go method, and then found that a more difficult class might not be easy to deal with. It is `org.apache.flink.streaming. runtime.io.benchmark.ReceiverThread`,  do you have any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r917850634


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -120,7 +154,7 @@ private void checkError() throws Exception {
 
     private void checkFinished() throws Exception {
         if (getState() != State.TERMINATED) {
-            throw new Exception(
+            throw new TimeoutException(

Review Comment:
   I think maybe it's more reasonable to use TimeoutException here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r918081449


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   Nice idea, But I tried to get rid of the support for overriding the go method, and then found that a more difficult class might not be easy to deal with. It is `org.apache.flink.streaming.runtime.io.benchmark.ReceiverThread`,  do you have any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20229:
URL: https://github.com/apache/flink/pull/20229#issuecomment-1179323284

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bfb8415d3cf9b0d09e49e671ab6e8c93c815bd7c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bfb8415d3cf9b0d09e49e671ab6e8c93c815bd7c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bfb8415d3cf9b0d09e49e671ab6e8c93c815bd7c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r918012877


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   The BlockingWriterThread case can be solved quite easily with a factory method and passing the latch via the constructor.
   
   ```
       static BlockingWriterThread createBlockingWriterThread(
               LimitedConnectionsFileSystem fs,
               Path path,
               int maxConcurrentOutputStreams,
               int maxConcurrentStreamsTotal) {
           final OneShotLatch waiter = new OneShotLatch();
   
           return new BlockingThread(
                   () -> {
                       try (FSDataOutputStream stream = fs.create(path, WriteMode.OVERWRITE)) {
                           assertTrue(fs.getNumberOfOpenOutputStreams() <= maxConcurrentOutputStreams);
                           assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
   
                           final Random rnd = new Random();
                           final byte[] data = new byte[rnd.nextInt(10000) + 1];
                           rnd.nextBytes(data);
                           stream.write(data);
   
                           waiter.await();
   
                           // try to write one more thing, which might/should fail with an I/O
                           // exception
                           stream.write(rnd.nextInt());
                       }
                   },
                   waiter);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r918012877


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   The BlockingWriterThread case can be solved quite easily with a factory method, creating the latch before thread (with it being used directly for waiting) while passing the latch via the constructor for the await method.
   
   ```
       static BlockingWriterThread createBlockingWriterThread(
               LimitedConnectionsFileSystem fs,
               Path path,
               int maxConcurrentOutputStreams,
               int maxConcurrentStreamsTotal) {
           final OneShotLatch waiter = new OneShotLatch();
   
           return new BlockingThread(
                   () -> {
                       try (FSDataOutputStream stream = fs.create(path, WriteMode.OVERWRITE)) {
                           assertTrue(fs.getNumberOfOpenOutputStreams() <= maxConcurrentOutputStreams);
                           assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal);
   
                           final Random rnd = new Random();
                           final byte[] data = new byte[rnd.nextInt(10000) + 1];
                           rnd.nextBytes(data);
                           stream.write(data);
   
                           waiter.await();
   
                           // try to write one more thing, which might/should fail with an I/O
                           // exception
                           stream.write(rnd.nextInt());
                       }
                   },
                   waiter);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r918060780


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RunnableWithException.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+@FunctionalInterface
+public interface RunnableWithException {
+
+    /**
+     * The work method.
+     *
+     * @throws Exception Exceptions may be thrown.
+     */
+    void run() throws Exception;
+}

Review Comment:
   In fact, I copied the code from `org.apache.flink.util.function.RunnableWithException`, but your suggestion should be more reasonable. I will copy `org.apache.flink.util.function.ThrowingRunnable` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on PR #20229:
URL: https://github.com/apache/flink/pull/20229#issuecomment-1192313172

   @zentol, sorry to bother you, this PR has been modified according to the previous discussion. Can you help me have a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r917810506


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   we can make this final, remove `go()` and for force `runnable` to be non-null.



##########
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java:
##########
@@ -142,18 +142,16 @@ public void testStreaming() throws Exception {
         submitJobAndVerifyResults(jobGraph);
     }
 
-    private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {
+    private void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {

Review Comment:
   ?



##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -120,7 +154,7 @@ private void checkError() throws Exception {
 
     private void checkFinished() throws Exception {
         if (getState() != State.TERMINATED) {
-            throw new Exception(
+            throw new TimeoutException(

Review Comment:
   ?



##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RunnableWithException.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+@FunctionalInterface
+public interface RunnableWithException {
+
+    /**
+     * The work method.
+     *
+     * @throws Exception Exceptions may be thrown.
+     */
+    void run() throws Exception;
+}

Review Comment:
   Why aren't you copying the definition from `ThrowingRunnable`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r917878601


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   I think when using `CheckedThread` you should keep the habit of using `Thread`: pass in a runnable or override the go method.
   Another reason is that I found that many test classes such as: `org.apache.flink.core.fs.LimitedConnectionsFileSystemTest.BlockingWriterThread` will be ugly if they do not support overriding the go method: I thought about introducing a class `BlockingWriterRunnable` extends `RunnableWithExcpetion`, and then let the field of BlockingWriterThread and the logic of the go method are transferred to BlockingWriterRunnable, and then replaced it with `new CheckedThrad(new BlockingWriterRunnable(xxx) )` where `BlockingWriterThread` is used. This is very inconvenient to use, because the test class will call the `sync` on `CheckedThread`, and also call the `wakeup` method on `BlockingWriterRunnable`,  but in the original implementation, these methods are all on `BlockingWriterThread`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r917852362


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RunnableWithException.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+@FunctionalInterface
+public interface RunnableWithException {
+
+    /**
+     * The work method.
+     *
+     * @throws Exception Exceptions may be thrown.
+     */
+    void run() throws Exception;
+}

Review Comment:
   Because flink-core is not include in CheckedThread's module(flink-test-utils-junit), or It should be said that all Flink related dependencies are not included in it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r917869284


##########
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java:
##########
@@ -142,18 +142,16 @@ public void testStreaming() throws Exception {
         submitJobAndVerifyResults(jobGraph);
     }
 
-    private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {
+    private void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {

Review Comment:
   Here is for use `getClass().getClassLoader()` in lambda expression, perhaps it is more appropriate to use `AccumulatorLiveITCase.class.getClassLoader()` or `CheckedThread.class.getClassLoader()` directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r918004905


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RunnableWithException.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+@FunctionalInterface
+public interface RunnableWithException {
+
+    /**
+     * The work method.
+     *
+     * @throws Exception Exceptions may be thrown.
+     */
+    void run() throws Exception;
+}

Review Comment:
   I'm aware that we can't use the implementation from flink-core; what I was asking is why this class, which has the exact same requirements as the ThrowingRunnable, is implemented differently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r920265715


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   @zentol The migration suggestion you gave is really good. I have learned a lot from you, and thank you sincerely~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r920265715


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   @zentol The migration suggestion you gave is really benefits me a lot. I have learned too much from you, and thank you sincerely~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa closed pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa closed pull request #20229: [FLINK-28399] Refactor CheckedThread to accept runnable via constructor
URL: https://github.com/apache/flink/pull/20229


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org