You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by zh...@apache.org on 2022/11/28 11:17:49 UTC

[incubator-hugegraph-computer] branch close_test_thread created (now 9974442d)

This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a change to branch close_test_thread
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git


      at 9974442d ensure thread can close in test

This branch includes the following new commits:

     new 9974442d ensure thread can close in test

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-hugegraph-computer] 01/01: ensure thread can close in test

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch close_test_thread
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git

commit 9974442dbde08bc372b9d30c745b96d3ed9ca812
Author: coderzc <zh...@apache.org>
AuthorDate: Mon Nov 28 19:17:24 2022 +0800

    ensure thread can close in test
---
 .../computer/algorithm/AlgorithmTestBase.java      | 33 +++++++--------
 .../suite/integrate/SenderIntegrateTest.java       | 49 ++++++++++++++++------
 2 files changed, 53 insertions(+), 29 deletions(-)

diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmTestBase.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmTestBase.java
index 84147345..e1917bd3 100644
--- a/computer-test/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmTestBase.java
+++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmTestBase.java
@@ -19,9 +19,12 @@
 
 package com.baidu.hugegraph.computer.algorithm;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -38,18 +41,19 @@ import com.baidu.hugegraph.computer.core.util.ComputerContextUtil;
 import com.baidu.hugegraph.computer.core.worker.MockWorkerService;
 import com.baidu.hugegraph.computer.core.worker.WorkerService;
 import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
+import com.google.common.collect.Sets;
 
 public class AlgorithmTestBase extends UnitTestBase {
 
     public static final Logger LOG = Log.logger(AlgorithmTestBase.class);
 
-    public static void runAlgorithm(String algorithmParams, String ... options)
-                                    throws InterruptedException {
+    public static void runAlgorithm(String algorithmParams, String... options)
+            throws InterruptedException, IOException {
         final Logger log = Log.logger(AlgorithmTestBase.class);
         ExecutorService pool = Executors.newFixedThreadPool(2);
         CountDownLatch countDownLatch = new CountDownLatch(2);
         Throwable[] exceptions = new Throwable[2];
-
+        Set<Closeable> closeableSet = Sets.newConcurrentHashSet();
         pool.submit(() -> {
             WorkerService workerService = null;
             try {
@@ -75,7 +79,7 @@ public class AlgorithmTestBase extends UnitTestBase {
                 }
                 Config config = ComputerContextUtil.initContext(params);
                 workerService = new MockWorkerService();
-
+                closeableSet.add(workerService);
                 workerService.init(config);
                 workerService.execute();
             } catch (Throwable e) {
@@ -86,9 +90,6 @@ public class AlgorithmTestBase extends UnitTestBase {
                     countDownLatch.countDown();
                 }
             } finally {
-                if (workerService != null) {
-                    workerService.close();
-                }
                 countDownLatch.countDown();
             }
         });
@@ -120,7 +121,7 @@ public class AlgorithmTestBase extends UnitTestBase {
                 Config config = ComputerContextUtil.initContext(params);
 
                 masterService = new MasterService();
-
+                closeableSet.add(masterService);
                 masterService.init(config);
                 masterService.execute();
             } catch (Throwable e) {
@@ -131,19 +132,17 @@ public class AlgorithmTestBase extends UnitTestBase {
                     countDownLatch.countDown();
                 }
             } finally {
-                /*
-                 * It must close the service first. The pool will be shutdown
-                 * if count down is executed first, and the server thread in
-                 * master service will not be closed.
-                 */
-                if (masterService != null) {
-                    masterService.close();
-                }
                 countDownLatch.countDown();
             }
         });
 
-        countDownLatch.await();
+        try {
+            countDownLatch.await();
+        } finally {
+            for (Closeable closeable : closeableSet) {
+                closeable.close();
+            }
+        }
         pool.shutdownNow();
 
         Assert.assertFalse(Arrays.asList(exceptions).toString(),
diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/SenderIntegrateTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/SenderIntegrateTest.java
index 786c33bb..b5fbf393 100644
--- a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/SenderIntegrateTest.java
+++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/SenderIntegrateTest.java
@@ -19,10 +19,13 @@
 
 package com.baidu.hugegraph.computer.suite.integrate;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.function.Function;
@@ -49,6 +52,7 @@ import com.baidu.hugegraph.computer.core.network.netty.NettyTransportClient;
 import com.baidu.hugegraph.computer.core.network.session.ClientSession;
 import com.baidu.hugegraph.computer.core.util.ComputerContextUtil;
 import com.baidu.hugegraph.computer.core.worker.WorkerService;
+import com.google.common.collect.Sets;
 
 public class SenderIntegrateTest {
 
@@ -67,7 +71,8 @@ public class SenderIntegrateTest {
     }
 
     @Test
-    public void testOneWorker() {
+    public void testOneWorker() throws IOException {
+        Set<Closeable> closeableSet = Sets.newConcurrentHashSet();
         CompletableFuture<Void> masterFuture = new CompletableFuture<>();
         Thread masterThread = new Thread(() -> {
             String[] args = OptionsBuilder.newInstance()
@@ -86,8 +91,8 @@ public class SenderIntegrateTest {
                                           .withRpcServerPort(0)
                                           .build();
             try (MasterService service = initMaster(args)) {
+                closeableSet.add(service);
                 service.execute();
-                service.close();
                 masterFuture.complete(null);
             } catch (Exception e) {
                 LOG.error("Failed to execute master service", e);
@@ -112,8 +117,8 @@ public class SenderIntegrateTest {
                                           .withTransoprtServerPort(0)
                                           .build();
             try (WorkerService service = initWorker(args)) {
+                closeableSet.add(service);
                 service.execute();
-                service.close();
                 workerFuture.complete(null);
             } catch (Throwable e) {
                 LOG.error("Failed to execute worker service", e);
@@ -124,13 +129,20 @@ public class SenderIntegrateTest {
         masterThread.start();
         workerThread.start();
 
-        CompletableFuture.allOf(workerFuture, masterFuture).join();
+        try {
+            CompletableFuture.allOf(workerFuture, masterFuture).join();
+        } finally {
+            for (Closeable closeable : closeableSet) {
+                closeable.close();
+            }
+        }
     }
 
     @Test
-    public void testMultiWorkers() {
+    public void testMultiWorkers() throws IOException {
         int workerCount = 3;
         int partitionCount = 3;
+        Set<Closeable> closeableSet = Sets.newConcurrentHashSet();
         CompletableFuture<Void> masterFuture = new CompletableFuture<>();
         Thread masterThread = new Thread(() -> {
             String[] args = OptionsBuilder.newInstance()
@@ -148,8 +160,8 @@ public class SenderIntegrateTest {
                                           .build();
             try {
                 MasterService service = initMaster(args);
+                closeableSet.add(service);
                 service.execute();
-                service.close();
                 masterFuture.complete(null);
             } catch (Throwable e) {
                 LOG.error("Failed to execute master service", e);
@@ -180,8 +192,8 @@ public class SenderIntegrateTest {
                         .build();
                 try {
                     WorkerService service = initWorker(args);
+                    closeableSet.add(service);
                     service.execute();
-                    service.close();
                     workerFuture.complete(null);
                 } catch (Throwable e) {
                     LOG.error("Failed to execute worker service", e);
@@ -199,11 +211,18 @@ public class SenderIntegrateTest {
 
         List<CompletableFuture<Void>> futures = new ArrayList<>(workers.values());
         futures.add(masterFuture);
-        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+        try {
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+        } finally {
+            for (Closeable closeable : closeableSet) {
+                closeable.close();
+            }
+        }
     }
 
     @Test
-    public void testOneWorkerWithBusyClient() throws InterruptedException {
+    public void testOneWorkerWithBusyClient() throws IOException {
+        Set<Closeable> closeableSet = Sets.newConcurrentHashSet();
         CompletableFuture<Void> masterFuture = new CompletableFuture<>();
         Thread masterThread = new Thread(() -> {
             String[] args = OptionsBuilder.newInstance()
@@ -221,8 +240,8 @@ public class SenderIntegrateTest {
                                           .withRpcServerPort(0)
                                           .build();
             try (MasterService service = initMaster(args)) {
+                closeableSet.add(service);
                 service.execute();
-                service.close();
                 masterFuture.complete(null);
             } catch (Throwable e) {
                 LOG.error("Failed to execute master service", e);
@@ -248,10 +267,10 @@ public class SenderIntegrateTest {
                                           .withTransoprtServerPort(transoprtServerPort)
                                           .build();
             try (WorkerService service = initWorker(args)) {
+                closeableSet.add(service);
                 // Let send rate slowly
                 this.slowSendFunc(service, transoprtServerPort);
                 service.execute();
-                service.close();
                 workerFuture.complete(null);
             } catch (Throwable e) {
                 LOG.error("Failed to execute worker service", e);
@@ -262,7 +281,13 @@ public class SenderIntegrateTest {
         masterThread.start();
         workerThread.start();
 
-        CompletableFuture.allOf(workerFuture, masterFuture).join();
+        try {
+            CompletableFuture.allOf(workerFuture, masterFuture).join();
+        } finally {
+            for (Closeable closeable : closeableSet) {
+                closeable.close();
+            }
+        }
     }
 
     private void slowSendFunc(WorkerService service, int port) throws TransportException {