You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by zh...@apache.org on 2022/11/30 16:09:39 UTC
[incubator-hugegraph-computer] branch master updated: fix(test): Ensure resource can be closed when test failed (#210)
This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git
The following commit(s) were added to refs/heads/master by this push:
new 3fecd258 fix(test): Ensure resource can be closed when test failed (#210)
3fecd258 is described below
commit 3fecd258c15ef93fe89e5ae25ecf6e38ce3f190a
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Dec 1 00:09:33 2022 +0800
fix(test): Ensure resource can be closed when test failed (#210)
---
.../computer/core/master/MasterService.java | 2 +-
.../computer/core/worker/WorkerService.java | 2 +-
.../computer/algorithm/AlgorithmTestBase.java | 35 +++++++--------
.../core/network/netty/AbstractNetworkTest.java | 2 +-
.../network/netty/NettyTransportClientTest.java | 7 +--
.../hugegraph/computer/k8s/MiniKubeTest.java | 2 +
.../suite/integrate/SenderIntegrateTest.java | 50 +++++++++++++++++-----
7 files changed, 64 insertions(+), 36 deletions(-)
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
index 91164d40..58c2bbce 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
@@ -146,7 +146,7 @@ public class MasterService implements Closeable {
* {@link #init(Config)}.
*/
@Override
- public void close() {
+ public synchronized void close() {
this.checkInited();
if (this.closed) {
LOG.info("{} MasterService had closed before", this);
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
index 0ca45ed2..d7f5f1ab 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
@@ -157,7 +157,7 @@ public class WorkerService implements Closeable {
* {@link #init(Config)}.
*/
@Override
- public void close() {
+ public synchronized void close() {
this.checkInited();
if (this.closed) {
LOG.info("{} WorkerService had closed before", this);
diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/algorithm/AlgorithmTestBase.java b/computer-test/src/main/java/org/apache/hugegraph/computer/algorithm/AlgorithmTestBase.java
index 283b9439..b68600d5 100644
--- a/computer-test/src/main/java/org/apache/hugegraph/computer/algorithm/AlgorithmTestBase.java
+++ b/computer-test/src/main/java/org/apache/hugegraph/computer/algorithm/AlgorithmTestBase.java
@@ -22,9 +22,11 @@ package org.apache.hugegraph.computer.algorithm;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
@@ -38,17 +40,20 @@ import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
+import com.google.common.collect.Sets;
+
public class AlgorithmTestBase extends UnitTestBase {
public static final Logger LOG = Log.logger(AlgorithmTestBase.class);
- public static void runAlgorithm(String algorithmParams, String ... options)
- throws InterruptedException {
+ public static void runAlgorithm(String algorithmParams, String... options)
+ throws InterruptedException {
final Logger log = Log.logger(AlgorithmTestBase.class);
ExecutorService pool = Executors.newFixedThreadPool(2);
CountDownLatch countDownLatch = new CountDownLatch(2);
Throwable[] exceptions = new Throwable[2];
-
+ AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
+ Set<WorkerService> workerServices = Sets.newConcurrentHashSet();
pool.submit(() -> {
WorkerService workerService = null;
try {
@@ -74,7 +79,7 @@ public class AlgorithmTestBase extends UnitTestBase {
}
Config config = ComputerContextUtil.initContext(params);
workerService = new MockWorkerService();
-
+ workerServices.add(workerService);
workerService.init(config);
workerService.execute();
} catch (Throwable e) {
@@ -85,9 +90,6 @@ public class AlgorithmTestBase extends UnitTestBase {
countDownLatch.countDown();
}
} finally {
- if (workerService != null) {
- workerService.close();
- }
countDownLatch.countDown();
}
});
@@ -119,7 +121,7 @@ public class AlgorithmTestBase extends UnitTestBase {
Config config = ComputerContextUtil.initContext(params);
masterService = new MasterService();
-
+ masterServiceRef.set(masterService);
masterService.init(config);
masterService.execute();
} catch (Throwable e) {
@@ -130,19 +132,18 @@ public class AlgorithmTestBase extends UnitTestBase {
countDownLatch.countDown();
}
} finally {
- /*
- * It must close the service first. The pool will be shutdown
- * if count down is executed first, and the server thread in
- * master service will not be closed.
- */
- if (masterService != null) {
- masterService.close();
- }
countDownLatch.countDown();
}
});
- countDownLatch.await();
+ try {
+ countDownLatch.await();
+ } finally {
+ for (WorkerService workerService : workerServices) {
+ workerService.close();
+ }
+ masterServiceRef.get().close();
+ }
pool.shutdownNow();
Assert.assertFalse(Arrays.asList(exceptions).toString(),
diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/AbstractNetworkTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/AbstractNetworkTest.java
index e40fcb00..efde0efb 100644
--- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/AbstractNetworkTest.java
+++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/AbstractNetworkTest.java
@@ -50,7 +50,7 @@ import org.mockito.Mockito;
import io.netty.bootstrap.ServerBootstrap;
-public abstract class AbstractNetworkTest {
+public abstract class AbstractNetworkTest extends UnitTestBase {
private static final Map<ConfigOption<?>, String> OPTIONS = new HashMap<>();
protected static Config config;
diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
index f2a6c1b9..eb4a4f88 100644
--- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
+++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
@@ -315,11 +315,8 @@ public class NettyTransportClientTest extends AbstractNetworkTest {
client.startSession();
- Mockito.doAnswer(invocationOnMock -> {
- invocationOnMock.callRealMethod();
- throw new RuntimeException("test exception");
- }).when(serverHandler)
- .handle(Mockito.any(), Mockito.anyInt(), Mockito.any());
+ Mockito.doThrow(new RuntimeException("test exception")).when(serverHandler)
+ .handle(Mockito.any(), Mockito.anyInt(), Mockito.any());
ByteBuffer buffer = ByteBuffer.wrap(StringEncoding.encode("test data"));
boolean send = client.send(MessageType.MSG, 1, buffer);
diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/MiniKubeTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/MiniKubeTest.java
index 9c77af6e..a5bcf029 100644
--- a/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/MiniKubeTest.java
+++ b/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/MiniKubeTest.java
@@ -251,6 +251,8 @@ public class MiniKubeTest extends AbstractK8sTest {
this.driver.cancelJob(jobId, params);
+ UnitTestBase.sleep(1500L);
+
DefaultJobState jobState2 = new DefaultJobState();
jobState2.jobStatus(JobStatus.CANCELLED);
Mockito.verify(jobObserver, Mockito.timeout(15000L).atLeast(1))
diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java
index 293fcaa3..30d21bda 100644
--- a/computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java
+++ b/computer-test/src/main/java/org/apache/hugegraph/computer/suite/integrate/SenderIntegrateTest.java
@@ -19,12 +19,15 @@
package org.apache.hugegraph.computer.suite.integrate;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hugegraph.computer.algorithm.centrality.pagerank.PageRankParams;
@@ -49,6 +52,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
+import com.google.common.collect.Sets;
+
public class SenderIntegrateTest {
public static final Logger LOG = Log.logger(SenderIntegrateTest.class);
@@ -67,6 +72,8 @@ public class SenderIntegrateTest {
@Test
public void testOneWorker() {
+ AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
+ AtomicReference<WorkerService> workerServiceRef = new AtomicReference<>();
CompletableFuture<Void> masterFuture = new CompletableFuture<>();
Thread masterThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
@@ -85,8 +92,8 @@ public class SenderIntegrateTest {
.withRpcServerPort(0)
.build();
try (MasterService service = initMaster(args)) {
+ masterServiceRef.set(service);
service.execute();
- service.close();
masterFuture.complete(null);
} catch (Exception e) {
LOG.error("Failed to execute master service", e);
@@ -111,8 +118,8 @@ public class SenderIntegrateTest {
.withTransoprtServerPort(0)
.build();
try (WorkerService service = initWorker(args)) {
+ workerServiceRef.set(service);
service.execute();
- service.close();
workerFuture.complete(null);
} catch (Throwable e) {
LOG.error("Failed to execute worker service", e);
@@ -123,13 +130,20 @@ public class SenderIntegrateTest {
masterThread.start();
workerThread.start();
- CompletableFuture.allOf(workerFuture, masterFuture).join();
+ try {
+ CompletableFuture.allOf(workerFuture, masterFuture).join();
+ } finally {
+ workerServiceRef.get().close();
+ masterServiceRef.get().close();
+ }
}
@Test
- public void testMultiWorkers() {
+ public void testMultiWorkers() throws IOException {
int workerCount = 3;
int partitionCount = 3;
+ AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
+ Set<WorkerService> workerServices = Sets.newConcurrentHashSet();
CompletableFuture<Void> masterFuture = new CompletableFuture<>();
Thread masterThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
@@ -147,8 +161,8 @@ public class SenderIntegrateTest {
.build();
try {
MasterService service = initMaster(args);
+ masterServiceRef.set(service);
service.execute();
- service.close();
masterFuture.complete(null);
} catch (Throwable e) {
LOG.error("Failed to execute master service", e);
@@ -179,8 +193,8 @@ public class SenderIntegrateTest {
.build();
try {
WorkerService service = initWorker(args);
+ workerServices.add(service);
service.execute();
- service.close();
workerFuture.complete(null);
} catch (Throwable e) {
LOG.error("Failed to execute worker service", e);
@@ -198,11 +212,20 @@ public class SenderIntegrateTest {
List<CompletableFuture<Void>> futures = new ArrayList<>(workers.values());
futures.add(masterFuture);
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+ try {
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+ } finally {
+ for (WorkerService workerService : workerServices) {
+ workerService.close();
+ }
+ masterServiceRef.get().close();
+ }
}
@Test
- public void testOneWorkerWithBusyClient() throws InterruptedException {
+ public void testOneWorkerWithBusyClient() {
+ AtomicReference<MasterService> masterServiceRef = new AtomicReference<>();
+ AtomicReference<WorkerService> workerServiceRef = new AtomicReference<>();
CompletableFuture<Void> masterFuture = new CompletableFuture<>();
Thread masterThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
@@ -220,8 +243,8 @@ public class SenderIntegrateTest {
.withRpcServerPort(0)
.build();
try (MasterService service = initMaster(args)) {
+ masterServiceRef.set(service);
service.execute();
- service.close();
masterFuture.complete(null);
} catch (Throwable e) {
LOG.error("Failed to execute master service", e);
@@ -247,10 +270,10 @@ public class SenderIntegrateTest {
.withTransoprtServerPort(transoprtServerPort)
.build();
try (WorkerService service = initWorker(args)) {
+ workerServiceRef.set(service);
// Let send rate slowly
this.slowSendFunc(service, transoprtServerPort);
service.execute();
- service.close();
workerFuture.complete(null);
} catch (Throwable e) {
LOG.error("Failed to execute worker service", e);
@@ -261,7 +284,12 @@ public class SenderIntegrateTest {
masterThread.start();
workerThread.start();
- CompletableFuture.allOf(workerFuture, masterFuture).join();
+ try {
+ CompletableFuture.allOf(workerFuture, masterFuture).join();
+ } finally {
+ workerServiceRef.get().close();
+ masterServiceRef.get().close();
+ }
}
private void slowSendFunc(WorkerService service, int port) throws TransportException {