You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by zh...@apache.org on 2022/11/30 16:09:39 UTC

[incubator-hugegraph-computer] branch master updated: fix(test): Ensure resource can be closed when test failed (#210)

This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fecd258 fix(test): Ensure resource can be closed when test failed (#210)
3fecd258 is described below

commit 3fecd258c15ef93fe89e5ae25ecf6e38ce3f190a
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Dec 1 00:09:33 2022 +0800

    fix(test): Ensure resource can be closed when test failed (#210)
---
 .../computer/core/master/MasterService.java        |  2 +-
 .../computer/core/worker/WorkerService.java        |  2 +-
 .../computer/algorithm/AlgorithmTestBase.java      | 35 +++++++--------
 .../core/network/netty/AbstractNetworkTest.java    |  2 +-
 .../network/netty/NettyTransportClientTest.java    |  7 +--
 .../hugegraph/computer/k8s/MiniKubeTest.java       |  2 +
 .../suite/integrate/SenderIntegrateTest.java       | 50 +++++++++++++++++-----
 7 files changed, 64 insertions(+), 36 deletions(-)

diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
index 91164d40..58c2bbce 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
@@ -146,7 +146,7 @@ public class MasterService implements Closeable {
      * {@link #init(Config)}.
      */
     @Override
-    public void close() {
+    public synchronized void close() {
         this.checkInited();
         if (this.closed) {
             LOG.info("{} MasterService had closed before", this);
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
index 0ca45ed2..d7f5f1ab 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
@@ -157,7 +157,7 @@ public class WorkerService implements Closeable {
      * {@link #init(Config)}.
      */
     @Override
-    public void close() {
+    public synchronized void close() {
         this.checkInited();
         if (this.closed) {
             LOG.info("{} WorkerService had closed before", this);
diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/algorithm/AlgorithmTestBase.java b/computer-test/src/main/java/org/apache/hugegraph/computer/algorithm/AlgorithmTestBase.java
index 283b9439..b68600d5 100644
--- a/computer-test/src/main/java/org/apache/hugegraph/computer/algorithm/AlgorithmTestBase.java
+++ b/computer-test/src/main/java/org/apache/hugegraph/computer/algorithm/AlgorithmTestBase.java
@@ -22,9 +22,11 @@ package org.apache.hugegraph.computer.algorithm;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hugegraph.computer.core.config.ComputerOptions;
 import org.apache.hugegraph.computer.core.config.Config;
@@ -38,17 +40,20 @@ import org.apache.hugegraph.testutil.Assert;
 import org.apache.hugegraph.util.Log;
 import org.slf4j.Logger;
 
+import com.google.common.collect.Sets;
+
 public class AlgorithmTestBase extends UnitTestBase {
 
     public static final Logger LOG = Log.logger(AlgorithmTestBase.class);
 
-    public static void runAlgorithm(String algorithmParams, String ... options)
-                                    throws InterruptedException {
+    public static void runAlgorithm(String algorithmParams, String... options)
+            throws InterruptedException {
         final Logger log = Log.logger(AlgorithmTestBase.class);
         ExecutorService pool = Executors.newFixedThreadPool(2);
         CountDownLatch countDownLatch = new CountDownLatch(2);
         Throwable[] exceptions = new Throwable[2];
-
+        AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
+        Set<WorkerService> workerServices = Sets.newConcurrentHashSet();
         pool.submit(() -> {
             WorkerService workerService = null;
             try {
@@ -74,7 +79,7 @@ public class AlgorithmTestBase extends UnitTestBase {
                 }
                 Config config = ComputerContextUtil.initContext(params);
                 workerService = new MockWorkerService();
-
+                workerServices.add(workerService);
                 workerService.init(config);
                 workerService.execute();
             } catch (Throwable e) {
@@ -85,9 +90,6 @@ public class AlgorithmTestBase extends UnitTestBase {
                     countDownLatch.countDown();
                 }
             } finally {
-                if (workerService != null) {
-                    workerService.close();
-                }
                 countDownLatch.countDown();
             }
         });
@@ -119,7 +121,7 @@ public class AlgorithmTestBase extends UnitTestBase {
                 Config config = ComputerContextUtil.initContext(params);
 
                 masterService = new MasterService();
-
+                masterServiceRef.set(masterService);
                 masterService.init(config);
                 masterService.execute();
             } catch (Throwable e) {
@@ -130,19 +132,18 @@ public class AlgorithmTestBase extends UnitTestBase {
                     countDownLatch.countDown();
                 }
             } finally {
-                /*
-                 * It must close the service first. The pool will be shutdown
-                 * if count down is executed first, and the server thread in
-                 * master service will not be closed.
-                 */
-                if (masterService != null) {
-                    masterService.close();
-                }
                 countDownLatch.countDown();
             }
         });
 
-        countDownLatch.await();
+        try {
+            countDownLatch.await();
+        } finally {
+            for (WorkerService workerService : workerServices) {
+                workerService.close();
+            }
+            masterServiceRef.get().close();
+        }
         pool.shutdownNow();
 
         Assert.assertFalse(Arrays.asList(exceptions).toString(),
diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/AbstractNetworkTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/AbstractNetworkTest.java
index e40fcb00..efde0efb 100644
--- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/AbstractNetworkTest.java
+++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/AbstractNetworkTest.java
@@ -50,7 +50,7 @@ import org.mockito.Mockito;
 
 import io.netty.bootstrap.ServerBootstrap;
 
-public abstract class AbstractNetworkTest {
+public abstract class AbstractNetworkTest extends UnitTestBase {
 
     private static final Map<ConfigOption<?>, String> OPTIONS = new HashMap<>();
     protected static Config config;
diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
index f2a6c1b9..eb4a4f88 100644
--- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
+++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
@@ -315,11 +315,8 @@ public class NettyTransportClientTest extends AbstractNetworkTest {
 
         client.startSession();
 
-        Mockito.doAnswer(invocationOnMock -> {
-            invocationOnMock.callRealMethod();
-            throw new RuntimeException("test exception");
-        }).when(serverHandler)
-          .handle(Mockito.any(), Mockito.anyInt(), Mockito.any());
+        Mockito.doThrow(new RuntimeException("test exception")).when(serverHandler)
+               .handle(Mockito.any(), Mockito.anyInt(), Mockito.any());
 
         ByteBuffer buffer = ByteBuffer.wrap(StringEncoding.encode("test data"));
         boolean send = client.send(MessageType.MSG, 1, buffer);
diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/MiniKubeTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/MiniKubeTest.java
index 9c77af6e..a5bcf029 100644
--- a/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/MiniKubeTest.java
+++ b/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/MiniKubeTest.java
@@ -251,6 +251,8 @@ public class MiniKubeTest extends AbstractK8sTest {
 
         this.driver.cancelJob(jobId, params);
 
+        UnitTestBase.sleep(1500L);
+
         DefaultJobState jobState2 = new DefaultJobState();
         jobState2.jobStatus(JobStatus.CANCELLED);
         Mockito.verify(jobObserver, Mockito.timeout(15000L).atLeast(1))
diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java
index 293fcaa3..30d21bda 100644
--- a/computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java
+++ b/computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java
@@ -19,12 +19,15 @@
 
 package org.apache.hugegraph.computer.suite.integrate;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
 import org.apache.hugegraph.computer.algorithm.centrality.pagerank.PageRankParams;
@@ -49,6 +52,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 
+import com.google.common.collect.Sets;
+
 public class SenderIntegrateTest {
 
     public static final Logger LOG = Log.logger(SenderIntegrateTest.class);
@@ -67,6 +72,8 @@ public class SenderIntegrateTest {
 
     @Test
     public void testOneWorker() {
+        AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
+        AtomicReference<WorkerService> workerServiceRef = new AtomicReference<>();
         CompletableFuture<Void> masterFuture = new CompletableFuture<>();
         Thread masterThread = new Thread(() -> {
             String[] args = OptionsBuilder.newInstance()
@@ -85,8 +92,8 @@ public class SenderIntegrateTest {
                                           .withRpcServerPort(0)
                                           .build();
             try (MasterService service = initMaster(args)) {
+                masterServiceRef.set(service);
                 service.execute();
-                service.close();
                 masterFuture.complete(null);
             } catch (Exception e) {
                 LOG.error("Failed to execute master service", e);
@@ -111,8 +118,8 @@ public class SenderIntegrateTest {
                                           .withTransoprtServerPort(0)
                                           .build();
             try (WorkerService service = initWorker(args)) {
+                workerServiceRef.set(service);
                 service.execute();
-                service.close();
                 workerFuture.complete(null);
             } catch (Throwable e) {
                 LOG.error("Failed to execute worker service", e);
@@ -123,13 +130,20 @@ public class SenderIntegrateTest {
         masterThread.start();
         workerThread.start();
 
-        CompletableFuture.allOf(workerFuture, masterFuture).join();
+        try {
+            CompletableFuture.allOf(workerFuture, masterFuture).join();
+        } finally {
+            workerServiceRef.get().close();
+            masterServiceRef.get().close();
+        }
     }
 
     @Test
-    public void testMultiWorkers() {
+    public void testMultiWorkers() throws IOException {
         int workerCount = 3;
         int partitionCount = 3;
+        AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
+        Set<WorkerService> workerServices = Sets.newConcurrentHashSet();
         CompletableFuture<Void> masterFuture = new CompletableFuture<>();
         Thread masterThread = new Thread(() -> {
             String[] args = OptionsBuilder.newInstance()
@@ -147,8 +161,8 @@ public class SenderIntegrateTest {
                                           .build();
             try {
                 MasterService service = initMaster(args);
+                masterServiceRef.set(service);
                 service.execute();
-                service.close();
                 masterFuture.complete(null);
             } catch (Throwable e) {
                 LOG.error("Failed to execute master service", e);
@@ -179,8 +193,8 @@ public class SenderIntegrateTest {
                         .build();
                 try {
                     WorkerService service = initWorker(args);
+                    workerServices.add(service);
                     service.execute();
-                    service.close();
                     workerFuture.complete(null);
                 } catch (Throwable e) {
                     LOG.error("Failed to execute worker service", e);
@@ -198,11 +212,20 @@ public class SenderIntegrateTest {
 
         List<CompletableFuture<Void>> futures = new ArrayList<>(workers.values());
         futures.add(masterFuture);
-        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+        try {
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+        } finally {
+            for (WorkerService workerService : workerServices) {
+                workerService.close();
+            }
+            masterServiceRef.get().close();
+        }
     }
 
     @Test
-    public void testOneWorkerWithBusyClient() throws InterruptedException {
+    public void testOneWorkerWithBusyClient() {
+        AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
+        AtomicReference<WorkerService> workerServiceRef = new AtomicReference<>();
         CompletableFuture<Void> masterFuture = new CompletableFuture<>();
         Thread masterThread = new Thread(() -> {
             String[] args = OptionsBuilder.newInstance()
@@ -220,8 +243,8 @@ public class SenderIntegrateTest {
                                           .withRpcServerPort(0)
                                           .build();
             try (MasterService service = initMaster(args)) {
+                masterServiceRef.set(service);
                 service.execute();
-                service.close();
                 masterFuture.complete(null);
             } catch (Throwable e) {
                 LOG.error("Failed to execute master service", e);
@@ -247,10 +270,10 @@ public class SenderIntegrateTest {
                                           .withTransoprtServerPort(transoprtServerPort)
                                           .build();
             try (WorkerService service = initWorker(args)) {
+                workerServiceRef.set(service);
                 // Let send rate slowly
                 this.slowSendFunc(service, transoprtServerPort);
                 service.execute();
-                service.close();
                 workerFuture.complete(null);
             } catch (Throwable e) {
                 LOG.error("Failed to execute worker service", e);
@@ -261,7 +284,12 @@ public class SenderIntegrateTest {
         masterThread.start();
         workerThread.start();
 
-        CompletableFuture.allOf(workerFuture, masterFuture).join();
+        try {
+            CompletableFuture.allOf(workerFuture, masterFuture).join();
+        } finally {
+            workerServiceRef.get().close();
+            masterServiceRef.get().close();
+        }
     }
 
     private void slowSendFunc(WorkerService service, int port) throws TransportException {