You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/11 05:24:24 UTC
[incubator-streampark] branch dev updated: [Improve] Improve the unit test and comments of CompletableFutureUtils (#1568)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new f39b45b95 [Improve] Improve the unit test and comments of CompletableFutureUtils (#1568)
f39b45b95 is described below
commit f39b45b958be2cafbb11b9c9d43e93f28adbbaf1
Author: 1996fanrui <le...@shopee.com>
AuthorDate: Sun Sep 11 13:24:18 2022 +0800
[Improve] Improve the unit test and comments of CompletableFutureUtils (#1568)
---
.../common/util/CompletableFutureUtils.scala | 10 ++
.../src/test/java/CompletableFutureTest.java | 117 ++++++++++++++++-----
2 files changed, 102 insertions(+), 25 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala
index baf7dcee7..b5cb02c5b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala
@@ -55,6 +55,16 @@ object CompletableFutureUtils {
future.applyToEither(setTimeout(timeout, unit), handle).exceptionally(exceptionally)
}
+ /**
+ * Callback handler when the future is done before timeout. Callback exceptionally when the future completed exceptionally or future isn't
+ * done after timeout.
+ *
+ * @param future The future that needs to be watched.
+ * @param timeout timeout
+ * @param unit timeout unit
+ * @param handle The handler will be called when future complete normally before timeout.
+ * @param exceptionally The exceptionally will be called when future completed exceptionally or future isn't done after timeout.
+ */
def runTimeout[T](future: CompletableFuture[T],
timeout: Long,
unit: TimeUnit,
diff --git a/streampark-console/streampark-console-service/src/test/java/CompletableFutureTest.java b/streampark-console/streampark-console-service/src/test/java/CompletableFutureTest.java
index 08945988b..0637e4d38 100644
--- a/streampark-console/streampark-console-service/src/test/java/CompletableFutureTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/CompletableFutureTest.java
@@ -15,6 +15,9 @@
* limitations under the License.
*/
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.junit.Test;
@@ -22,61 +25,125 @@ import org.junit.Test;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
public class CompletableFutureTest {
@Test
- public void test() throws Exception {
+ public void testStartJobNormally() throws Exception {
+ // It takes 5 seconds to start job.
+ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> runStart(5));
+
+ // Stop job after 8 seconds.
+ CompletableFuture.runAsync(() -> runStop(future, 8));
+
+ AtomicBoolean isStartNormal = new AtomicBoolean(false);
+ AtomicBoolean isStartException = new AtomicBoolean(false);
+
+ // Set the timeout is 10 seconds for start job.
+ CompletableFutureUtils.runTimeout(
+ future,
+ 10L,
+ TimeUnit.SECONDS,
+ r -> isStartNormal.set(true),
+ e -> isStartException.set(true)
+ ).get();
- //启动任务需要10秒...
+ assertTrue(future.isDone());
+ assertTrue(isStartNormal.get());
+ assertFalse(isStartException.get());
+ }
+
+ @Test
+ public void testStopJobEarly() throws Exception {
+ // It takes 10 seconds to start job.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> runStart(10));
- // 5秒之后停止任务.
+ // Stop job after 5 seconds.
CompletableFuture.runAsync(() -> runStop(future, 5));
- //设置启动超时时间为20秒
+ AtomicBoolean isStartNormal = new AtomicBoolean(false);
+ AtomicBoolean isStartException = new AtomicBoolean(false);
+
+ // Set the timeout is 15 seconds for start job.
CompletableFutureUtils.runTimeout(
future,
- 20L,
+ 15L,
TimeUnit.SECONDS,
- r -> System.out.println(r),
+ r -> {
+ isStartNormal.set(true);
+ throw new IllegalStateException("It shouldn't be called due to the job is stopped before the timeout.");
+ },
e -> {
- if (e.getCause() instanceof CancellationException) {
- System.out.println("任务被终止....");
- } else {
- e.printStackTrace();
- future.cancel(true);
- System.out.println("start timeout");
- }
+ isStartException.set(true);
+ assertTrue(future.isCancelled());
+ assertTrue(e.getCause() instanceof CancellationException);
+ System.out.println("The future is cancelled.");
}
).get();
- if (future.isCancelled()) {
- System.out.println("cancelled...");
- }
+ assertTrue(future.isCancelled());
+ assertFalse(isStartNormal.get());
+ assertTrue(isStartException.get());
}
+ @Test
+ public void testStartJobTimeout() throws Exception {
+
+ // It takes 10 seconds to start job.
+ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> runStart(10));
+
+ // Stop job after 15 seconds.
+ CompletableFuture.runAsync(() -> runStop(future, 15));
+
+ AtomicBoolean isStartNormal = new AtomicBoolean(false);
+ AtomicBoolean isStartException = new AtomicBoolean(false);
+
+ // Set the timeout is 5 seconds for start job.
+ CompletableFutureUtils.runTimeout(
+ future,
+ 5L,
+ TimeUnit.SECONDS,
+ r -> {
+ isStartNormal.set(true);
+ throw new IllegalStateException("It shouldn't be called due to the job is timed out.");
+ },
+ e -> {
+ isStartException.set(true);
+ assertFalse(future.isDone());
+ assertTrue(e.getCause() instanceof TimeoutException);
+ future.cancel(true);
+ System.out.println("Future is timed out.");
+ }
+ ).get();
+ assertTrue(future.isCancelled());
+ assertFalse(isStartNormal.get());
+ assertTrue(isStartException.get());
+ }
+
+ /**
+ * Cancel the future after sec seconds.
+ */
private void runStop(CompletableFuture<String> future, int sec) {
try {
- Thread.sleep(sec * 1000);
+ Thread.sleep(sec * 1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
- if (!future.isCancelled()) {
- System.out.println("强行停止任务");
- future.cancel(true);
+ if (future.isDone()) {
+ System.out.println("The future is done.");
} else {
- System.out.println("任务正常停止...");
+ System.out.println("Force cancel future.");
+ future.cancel(true);
}
}
/**
- * 模拟启动需要30秒钟.
- *
- * @return
+ * Start job, it will take sec seconds.
*/
private String runStart(int sec) {
try {
- Thread.sleep(sec * 1000);
+ Thread.sleep(sec * 1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}