You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/11 05:24:24 UTC

[incubator-streampark] branch dev updated: [Improve] Improve the unit test and comments of CompletableFutureUtils (#1568)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new f39b45b95 [Improve] Improve the unit test and comments of CompletableFutureUtils (#1568)
f39b45b95 is described below

commit f39b45b958be2cafbb11b9c9d43e93f28adbbaf1
Author: 1996fanrui <le...@shopee.com>
AuthorDate: Sun Sep 11 13:24:18 2022 +0800

    [Improve] Improve the unit test and comments of CompletableFutureUtils (#1568)
---
 .../common/util/CompletableFutureUtils.scala       |  10 ++
 .../src/test/java/CompletableFutureTest.java       | 117 ++++++++++++++++-----
 2 files changed, 102 insertions(+), 25 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala
index baf7dcee7..b5cb02c5b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala
@@ -55,6 +55,16 @@ object CompletableFutureUtils {
     future.applyToEither(setTimeout(timeout, unit), handle).exceptionally(exceptionally)
   }
 
+  /**
+   * Callback handler when the future is done before timeout. Callback exceptionally when the future completed exceptionally or future isn't
+   * done after timeout.
+   *
+   * @param future        The future that needs to be watched.
+   * @param timeout       timeout
+   * @param unit          timeout unit
+   * @param handle        The handler will be called when future complete normally before timeout.
+   * @param exceptionally The exceptionally will be called when future completed exceptionally or future isn't done after timeout.
+   */
   def runTimeout[T](future: CompletableFuture[T],
                     timeout: Long,
                     unit: TimeUnit,
diff --git a/streampark-console/streampark-console-service/src/test/java/CompletableFutureTest.java b/streampark-console/streampark-console-service/src/test/java/CompletableFutureTest.java
index 08945988b..0637e4d38 100644
--- a/streampark-console/streampark-console-service/src/test/java/CompletableFutureTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/CompletableFutureTest.java
@@ -15,6 +15,9 @@
  * limitations under the License.
  */
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import org.apache.streampark.common.util.CompletableFutureUtils;
 
 import org.junit.Test;
@@ -22,61 +25,125 @@ import org.junit.Test;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class CompletableFutureTest {
 
     @Test
-    public void test() throws Exception {
+    public void testStartJobNormally() throws Exception {
+        // It takes 5 seconds to start job.
+        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> runStart(5));
+
+        // Stop job after 8 seconds.
+        CompletableFuture.runAsync(() -> runStop(future, 8));
+
+        AtomicBoolean isStartNormal = new AtomicBoolean(false);
+        AtomicBoolean isStartException = new AtomicBoolean(false);
+
+        // Set the timeout is 10 seconds for start job.
+        CompletableFutureUtils.runTimeout(
+            future,
+            10L,
+            TimeUnit.SECONDS,
+            r -> isStartNormal.set(true),
+            e -> isStartException.set(true)
+        ).get();
 
-        //启动任务需要10秒...
+        assertTrue(future.isDone());
+        assertTrue(isStartNormal.get());
+        assertFalse(isStartException.get());
+    }
+
+    @Test
+    public void testStopJobEarly() throws Exception {
+        // It takes 10 seconds to start job.
         CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> runStart(10));
 
-        // 5秒之后停止任务.
+        // Stop job after 5 seconds.
         CompletableFuture.runAsync(() -> runStop(future, 5));
 
-        //设置启动超时时间为20秒
+        AtomicBoolean isStartNormal = new AtomicBoolean(false);
+        AtomicBoolean isStartException = new AtomicBoolean(false);
+
+        // Set the timeout is 15 seconds for start job.
         CompletableFutureUtils.runTimeout(
             future,
-            20L,
+            15L,
             TimeUnit.SECONDS,
-            r -> System.out.println(r),
+            r -> {
+                isStartNormal.set(true);
+                throw new IllegalStateException("It shouldn't be called due to the job is stopped before the timeout.");
+            },
             e -> {
-                if (e.getCause() instanceof CancellationException) {
-                    System.out.println("任务被终止....");
-                } else {
-                    e.printStackTrace();
-                    future.cancel(true);
-                    System.out.println("start timeout");
-                }
+                isStartException.set(true);
+                assertTrue(future.isCancelled());
+                assertTrue(e.getCause() instanceof CancellationException);
+                System.out.println("The future is cancelled.");
             }
         ).get();
-        if (future.isCancelled()) {
-            System.out.println("cancelled...");
-        }
+        assertTrue(future.isCancelled());
+        assertFalse(isStartNormal.get());
+        assertTrue(isStartException.get());
     }
 
+    @Test
+    public void testStartJobTimeout() throws Exception {
+
+        // It takes 10 seconds to start job.
+        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> runStart(10));
+
+        // Stop job after 15 seconds.
+        CompletableFuture.runAsync(() -> runStop(future, 15));
+
+        AtomicBoolean isStartNormal = new AtomicBoolean(false);
+        AtomicBoolean isStartException = new AtomicBoolean(false);
+
+        // Set the timeout is 5 seconds for start job.
+        CompletableFutureUtils.runTimeout(
+            future,
+            5L,
+            TimeUnit.SECONDS,
+            r -> {
+                isStartNormal.set(true);
+                throw new IllegalStateException("It shouldn't be called due to the job is timed out.");
+            },
+            e -> {
+                isStartException.set(true);
+                assertFalse(future.isDone());
+                assertTrue(e.getCause() instanceof TimeoutException);
+                future.cancel(true);
+                System.out.println("Future is timed out.");
+            }
+        ).get();
+        assertTrue(future.isCancelled());
+        assertFalse(isStartNormal.get());
+        assertTrue(isStartException.get());
+    }
+
+    /**
+     * Cancel the future after sec seconds.
+     */
     private void runStop(CompletableFuture<String> future, int sec) {
         try {
-            Thread.sleep(sec * 1000);
+            Thread.sleep(sec * 1000L);
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
-        if (!future.isCancelled()) {
-            System.out.println("强行停止任务");
-            future.cancel(true);
+        if (future.isDone()) {
+            System.out.println("The future is done.");
         } else {
-            System.out.println("任务正常停止...");
+            System.out.println("Force cancel future.");
+            future.cancel(true);
         }
     }
 
     /**
-     * 模拟启动需要30秒钟.
-     *
-     * @return
+     * Start job, it will take sec seconds.
      */
     private String runStart(int sec) {
         try {
-            Thread.sleep(sec * 1000);
+            Thread.sleep(sec * 1000L);
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }