You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/08/27 08:56:38 UTC
[ignite-3] branch main updated: IGNITE-15313 shutdown Executors
correctly in JRaft codebase. Fixes #288
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 42a5d78 IGNITE-15313 shutdown Executors correctly in JRaft codebase. Fixes #288
42a5d78 is described below
commit 42a5d780fc5f4c335eebcf14286cdc12b37694e8
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Fri Aug 27 11:56:15 2021 +0300
IGNITE-15313 shutdown Executors correctly in JRaft codebase. Fixes #288
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../tostring/IgniteToStringBuilderSelfTest.java | 6 +-
.../client/service/impl/RaftGroupServiceImpl.java | 3 +-
.../ignite/raft/jraft/core/ITCliServiceTest.java | 8 +-
.../apache/ignite/raft/jraft/core/ITNodeTest.java | 46 +++++--
.../apache/ignite/raft/server/CounterListener.java | 7 +-
.../internal/raft/server/impl/JRaftServerImpl.java | 23 +++-
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 11 +-
.../raft/jraft/storage/impl/RocksDBLogStorage.java | 6 +-
.../jraft/closure/SynchronizedClosureTest.java | 40 +++---
.../ignite/raft/jraft/core/BallotBoxTest.java | 7 +-
.../ignite/raft/jraft/core/FSMCallerTest.java | 8 +-
.../ignite/raft/jraft/core/IteratorImplTest.java | 13 +-
.../raft/jraft/core/ReadOnlyServiceTest.java | 21 ++-
.../ignite/raft/jraft/core/ReplicatorTest.java | 7 +-
.../apache/ignite/raft/jraft/core/TestCluster.java | 35 ++++-
.../jraft/entity/codec/LogEntryCodecPerfTest.java | 9 +-
.../raft/jraft/rpc/AbstractClientServiceTest.java | 12 +-
.../ignite/raft/jraft/rpc/IgniteRpcTest.java | 21 ++-
.../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 27 +---
.../ignite/raft/jraft/rpc/impl/FutureTest.java | 75 ++++++-----
.../core/AppendEntriesRequestProcessorTest.java | 3 +-
.../impl/core/BaseNodeRequestProcessorTest.java | 19 ++-
.../raft/jraft/storage/SnapshotExecutorTest.java | 16 ++-
.../raft/jraft/storage/impl/LogManagerTest.java | 8 +-
.../storage/snapshot/remote/CopySessionTest.java | 54 ++++----
.../raft/jraft/util/ByteBufferCollectorTest.java | 7 +-
.../ignite/raft/jraft/util/CountDownEventTest.java | 14 +-
.../jraft/util/RecyclableByteBufferListTest.java | 7 +-
.../ignite/raft/jraft/util/RecyclersTest.java | 21 ++-
.../ignite/raft/jraft/util/ThreadIdTest.java | 70 +++++-----
.../apache/ignite/raft/jraft/util/UtilsTest.java | 10 +-
.../LongHeldDetectingReadWriteLockTest.java | 146 +++++++++++++--------
modules/table/pom.xml | 7 +
.../internal/tx/AbstractLockManagerTest.java | 26 ++--
34 files changed, 531 insertions(+), 262 deletions(-)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/tostring/IgniteToStringBuilderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/tostring/IgniteToStringBuilderSelfTest.java
index 68114a9..82e087a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/tostring/IgniteToStringBuilderSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/tostring/IgniteToStringBuilderSelfTest.java
@@ -32,11 +32,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteSystemProperties;
import org.junit.jupiter.api.Test;
@@ -182,7 +184,7 @@ public class IgniteToStringBuilderSelfTest extends IgniteAbstractTest {
String expN4 = n4.toString();
CyclicBarrier bar = new CyclicBarrier(4);
- Executor pool = Executors.newFixedThreadPool(4);
+ ExecutorService pool = Executors.newFixedThreadPool(4);
CompletableFuture<String> fut1 = runAsync(new BarrierCallable(bar, n1, expN1), pool);
CompletableFuture<String> fut2 = runAsync(new BarrierCallable(bar, n2, expN2), pool);
@@ -193,6 +195,8 @@ public class IgniteToStringBuilderSelfTest extends IgniteAbstractTest {
fut2.get(3_000, TimeUnit.MILLISECONDS);
fut3.get(3_000, TimeUnit.MILLISECONDS);
fut4.get(3_000, TimeUnit.MILLISECONDS);
+
+ IgniteUtils.shutdownAndAwaitTermination(pool, 3_000, TimeUnit.MILLISECONDS);
}
/**
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
index e920ec1..246211a 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkMessage;
@@ -362,7 +363,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
/** {@inheritDoc} */
@Override public void shutdown() {
- // No-op.
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
}
/**
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
index bf25cee..bcc29f9 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITCliServiceTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -48,6 +49,7 @@ import org.apache.ignite.raft.jraft.entity.Task;
import org.apache.ignite.raft.jraft.option.CliOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -86,6 +88,8 @@ public class ITCliServiceTest {
private Configuration conf;
+ private ExecutorService clientExecutor;
+
/** */
@BeforeEach
public void setup(TestInfo testInfo, @WorkDirectory Path dataPath) throws Exception {
@@ -113,7 +117,8 @@ public class ITCliServiceTest {
conf = new Configuration(peers, learners);
CliOptions opts = new CliOptions();
- opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, "client"));
+ clientExecutor = JRaftUtils.createClientExecutor(opts, "client");
+ opts.setClientExecutor(clientExecutor);
NodeFinder nodeFinder = peers.stream()
.map(PeerId::getEndpoint)
@@ -148,6 +153,7 @@ public class ITCliServiceTest {
public void teardown(TestInfo testInfo) throws Exception {
cliService.shutdown();
cluster.stopAll();
+ ExecutorServiceHelper.shutdownAndAwaitTermination(clientExecutor);
LOG.info(">>>>>>>>>>>>>>> End test method: " + testInfo.getDisplayName());
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index 499056a..6fe6f80 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.ignite.raft.jraft.core;
-import com.codahale.metrics.ConsoleReporter;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.Path;
@@ -29,7 +28,6 @@ import java.util.List;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -39,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
+import com.codahale.metrics.ConsoleReporter;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.lang.IgniteLogger;
@@ -85,7 +84,9 @@ import org.apache.ignite.raft.jraft.storage.snapshot.ThroughputSnapshotThrottle;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Bits;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
+import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -151,6 +152,10 @@ public class ITNodeTest {
private final List<RaftGroupService> services = new ArrayList<>();
+ private final List<ExecutorService> executors = new ArrayList<>();
+
+ private final List<FixedThreadsExecutorGroup> appendEntriesExecutors = new ArrayList<>();
+
@BeforeAll
public static void setupNodeTest() {
dumpThread = new DumpThread();
@@ -187,6 +192,10 @@ public class ITNodeTest {
}
});
+ executors.forEach(ExecutorServiceHelper::shutdownAndAwaitTermination);
+
+ appendEntriesExecutors.forEach(FixedThreadsExecutorGroup::shutdownGracefully);
+
if (cluster != null)
cluster.stopAll();
@@ -1354,10 +1363,15 @@ public class ITNodeTest {
assertEquals(3, leader.listPeers().size());
CountDownLatch latch = new CountDownLatch(10);
+
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+
+ executors.add(executor);
+
for (int i = 0; i < 10; i++) {
- new Thread() {
- @Override
- public void run() {
+ executor.submit(new Runnable() {
+ /** {@inheritDoc} */
+ @Override public void run() {
try {
for (int i = 0; i < 100; i++) {
try {
@@ -1397,7 +1411,7 @@ public class ITNodeTest {
Thread.currentThread().interrupt();
}
}
- }.start();
+ });
}
latch.await();
@@ -3049,6 +3063,8 @@ public class ITNodeTest {
ExecutorService executor = Executors.newSingleThreadExecutor();
+ executors.add(executor);
+
return Utils.runInThread(executor, () -> {
try {
while (!arg.stop) {
@@ -3196,7 +3212,9 @@ public class ITNodeTest {
List<Future<?>> futures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(threads);
- Executor executor = Executors.newFixedThreadPool(threads);
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
+
+ executors.add(executor);
for (int t = 0; t < threads; t++) {
ChangeArg arg = new ChangeArg(cluster, peers, false, true);
@@ -3309,8 +3327,12 @@ public class ITNodeTest {
private NodeOptions createNodeOptions() {
NodeOptions options = new NodeOptions();
- options.setCommonExecutor(JRaftUtils.createCommonExecutor(options));
- options.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(options));
+ ExecutorService executor = JRaftUtils.createCommonExecutor(options);
+ executors.add(executor);
+ options.setCommonExecutor(executor);
+ FixedThreadsExecutorGroup appendEntriesExecutor = JRaftUtils.createAppendEntriesExecutor(options);
+ appendEntriesExecutors.add(appendEntriesExecutor);
+ options.setStripedExecutor(appendEntriesExecutor);
return options;
}
@@ -3438,7 +3460,11 @@ public class ITNodeTest {
new TestScaleCubeClusterServiceFactory()
);
- IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions);
+ ExecutorService requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
+
+ executors.add(requestExecutor);
+
+ IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions, requestExecutor);
nodeOptions.setRpcClient(new IgniteRpcClient(clusterService));
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
index 1936a4f..37fb909 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
@@ -21,7 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -30,6 +30,7 @@ import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
/**
@@ -49,7 +50,7 @@ public class CounterListener implements RaftGroupListener {
/**
* Snapshot executor.
*/
- private Executor executor = Executors.newSingleThreadExecutor();
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
/** {@inheritDoc} */
@Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
@@ -106,7 +107,7 @@ public class CounterListener implements RaftGroupListener {
/** {@inheritDoc} */
@Override public void onShutdown() {
- // No-op.
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
}
/**
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index cf4283f..b0cf5c3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -25,6 +25,7 @@ import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -57,6 +58,7 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.JDKMarshaller;
import org.jetbrains.annotations.Nullable;
@@ -84,6 +86,9 @@ public class JRaftServerImpl implements RaftServer {
/** Options. */
private final NodeOptions opts;
+ /** Request executor. */
+ private ExecutorService requestExecutor;
+
/**
* @param service Cluster service.
* @param dataPath Data path.
@@ -125,12 +130,14 @@ public class JRaftServerImpl implements RaftServer {
if (opts.getClientExecutor() == null)
opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, opts.getServerName()));
+ requestExecutor = JRaftUtils.createRequestExecutor(opts);
+
rpcServer = new IgniteRpcServer(
service,
nodeManager,
opts.getRaftClientMessagesFactory(),
opts.getRaftMessagesFactory(),
- JRaftUtils.createRequestExecutor(opts)
+ requestExecutor
);
if (opts.getfSMCallerExecutorDisruptor() == null) {
@@ -186,6 +193,20 @@ public class JRaftServerImpl implements RaftServer {
if (opts.getLogManagerDisruptor() != null)
opts.getLogManagerDisruptor().shutdown();
+
+ if (opts.getCommonExecutor() != null)
+ ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getCommonExecutor());
+
+ if (opts.getStripedExecutor() != null)
+ opts.getStripedExecutor().shutdownGracefully();
+
+ if (opts.getScheduler() != null)
+ opts.getScheduler().shutdown();
+
+ if (opts.getClientExecutor() != null)
+ ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getClientExecutor());
+
+ ExecutorServiceHelper.shutdownAndAwaitTermination(requestExecutor);
}
/** {@inheritDoc} */
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index fa7f43c..9a42dc8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -21,17 +21,17 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import org.apache.ignite.network.NetworkMessageHandler;
-import org.apache.ignite.raft.client.message.RaftClientMessageGroup;
-import org.apache.ignite.raft.jraft.RaftMessageGroup;
-import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.raft.client.message.RaftClientMessageGroup;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.rpc.RpcContext;
import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
import org.apache.ignite.raft.jraft.rpc.RpcServer;
@@ -53,7 +53,6 @@ import org.apache.ignite.raft.jraft.rpc.impl.core.InstallSnapshotRequestProcesso
import org.apache.ignite.raft.jraft.rpc.impl.core.ReadIndexRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.RequestVoteRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.core.TimeoutNowRequestProcessor;
-import org.jetbrains.annotations.Nullable;
/**
* TODO https://issues.apache.org/jira/browse/IGNITE-14519 Unsubscribe on shutdown
@@ -81,7 +80,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
NodeManager nodeManager,
RaftClientMessagesFactory raftClientMessagesFactory,
RaftMessagesFactory raftMessagesFactory,
- @Nullable Executor rpcExecutor
+ Executor rpcExecutor
) {
this.service = service;
this.nodeManager = nodeManager;
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java
index 4f20001..d27b731 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -42,6 +42,7 @@ import org.apache.ignite.raft.jraft.util.Bits;
import org.apache.ignite.raft.jraft.util.BytesUtil;
import org.apache.ignite.raft.jraft.util.DebugStatistics;
import org.apache.ignite.raft.jraft.util.Describer;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.StorageOptionsFactory;
import org.apache.ignite.raft.jraft.util.Utils;
@@ -145,7 +146,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
private LogEntryEncoder logEntryEncoder;
private LogEntryDecoder logEntryDecoder;
- private Executor executor = Executors.newSingleThreadExecutor();
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
public RocksDBLogStorage(final String path, final RaftOptions raftOptions) {
super();
@@ -679,6 +680,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
* Called after closing db.
*/
protected void onShutdown() {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
}
/**
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/closure/SynchronizedClosureTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/closure/SynchronizedClosureTest.java
index 9617474..b78a420 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/closure/SynchronizedClosureTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/closure/SynchronizedClosureTest.java
@@ -41,27 +41,29 @@ public class SynchronizedClosureTest {
public void testAwaitRun() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicLong cost = new AtomicLong(0);
- new Thread() {
- @Override
- public void run() {
- try {
- long start = System.currentTimeMillis();
- done.await();
- cost.set(System.currentTimeMillis() - start);
- }
- catch (InterruptedException e) {
- LOG.error("Thread was interrupted", e);
- }
- latch.countDown();
+ Thread t = new Thread(() -> {
+ try {
+ long start = System.currentTimeMillis();
+ done.await();
+ cost.set(System.currentTimeMillis() - start);
}
- }.start();
+ catch (InterruptedException e) {
+ LOG.error("Thread was interrupted", e);
+ }
+ latch.countDown();
+ });
+ try {
+ t.start();
- int n = 1000;
- Thread.sleep(n);
- this.done.run(Status.OK());
- latch.await();
- assertEquals(n, cost.get(), 50);
- assertTrue(this.done.getStatus().isOk());
+ int n = 1000;
+ Thread.sleep(n);
+ this.done.run(Status.OK());
+ latch.await();
+ assertEquals(n, cost.get(), 50);
+ assertTrue(this.done.getStatus().isOk());
+ } finally {
+ t.join();
+ }
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
index ed24fb6..42b1a2b 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft.core;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.JRaftUtils;
@@ -24,6 +25,7 @@ import org.apache.ignite.raft.jraft.closure.ClosureQueueImpl;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.BallotBoxOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -45,12 +47,14 @@ public class BallotBoxTest {
@Mock
private FSMCaller waiter;
private ClosureQueueImpl closureQueue;
+ private ExecutorService executor;
@BeforeEach
public void setup() {
BallotBoxOptions opts = new BallotBoxOptions();
NodeOptions options = new NodeOptions();
- options.setCommonExecutor(JRaftUtils.createExecutor("test-executor-", Utils.cpus()));
+ executor = JRaftUtils.createExecutor("test-executor-", Utils.cpus());
+ options.setCommonExecutor(executor);
this.closureQueue = new ClosureQueueImpl(options);
opts.setClosureQueue(this.closureQueue);
opts.setWaiter(this.waiter);
@@ -61,6 +65,7 @@ public class BallotBoxTest {
@AfterEach
public void teardown() {
box.shutdown();
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
index b0d56ad..bdcf77d 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.core;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.raft.jraft.Iterator;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -40,6 +41,7 @@ import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -69,11 +71,14 @@ public class FSMCallerTest {
/** Disruptor for this service test. */
private StripedDisruptor disruptor;
+ private ExecutorService executor;
+
@BeforeEach
public void setup() {
this.fsmCaller = new FSMCallerImpl();
NodeOptions options = new NodeOptions();
- options.setCommonExecutor(JRaftUtils.createExecutor("test-executor-", Utils.cpus()));
+ executor = JRaftUtils.createExecutor("test-executor-", Utils.cpus());
+ options.setCommonExecutor(executor);
this.closureQueue = new ClosureQueueImpl(options);
opts = new FSMCallerOptions();
Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
@@ -99,6 +104,7 @@ public class FSMCallerTest {
this.fsmCaller.join();
disruptor.shutdown();
}
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/IteratorImplTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/IteratorImplTest.java
index 33cdf85..885b237 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/IteratorImplTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/IteratorImplTest.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.jraft.core;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.JRaftUtils;
@@ -28,7 +29,9 @@ import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.storage.LogManager;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -60,6 +63,8 @@ public class IteratorImplTest {
private AtomicLong applyingIndex;
+ private ExecutorService executor;
+
@BeforeEach
public void setup() {
this.applyingIndex = new AtomicLong(0);
@@ -72,10 +77,16 @@ public class IteratorImplTest {
Mockito.when(this.logManager.getEntry(i)).thenReturn(log);
}
NodeOptions nodeOptions = new NodeOptions();
- nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
+ executor = JRaftUtils.createExecutor("test-executor", Utils.cpus());
+ nodeOptions.setCommonExecutor(executor);
this.iter = new IteratorImpl(fsm, logManager, closures, 0L, 0L, 10L, applyingIndex, nodeOptions);
}
+ @AfterEach
+ public void teardown() {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
+ }
+
@Test
public void testPredicates() {
assertTrue(this.iter.isGood());
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
index 53d9f98..05af451 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
@@ -18,7 +18,9 @@ package org.apache.ignite.raft.jraft.core;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -36,6 +38,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Bytes;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -71,6 +74,10 @@ public class ReadOnlyServiceTest {
/** Disruptor for this service test. */
private StripedDisruptor disruptor;
+ private List<ExecutorService> executors = new ArrayList<>();
+
+ private Scheduler scheduler;
+
@BeforeEach
public void setup() {
this.readOnlyServiceImpl = new ReadOnlyServiceImpl();
@@ -86,9 +93,15 @@ public class ReadOnlyServiceTest {
() -> new ReadOnlyServiceImpl.ReadIndexEvent(),
1));
NodeOptions nodeOptions = new NodeOptions();
- nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
- nodeOptions.setClientExecutor(JRaftUtils.createClientExecutor(nodeOptions, "unittest"));
- nodeOptions.setScheduler(JRaftUtils.createScheduler(nodeOptions));
+ ExecutorService executor = JRaftUtils.createExecutor("test-executor", Utils.cpus());
+ executors.add(executor);
+ nodeOptions.setCommonExecutor(executor);
+ ExecutorService clientExecutor = JRaftUtils.createClientExecutor(nodeOptions, "unittest");
+ executors.add(clientExecutor);
+ nodeOptions.setClientExecutor(clientExecutor);
+ Scheduler scheduler = JRaftUtils.createScheduler(nodeOptions);
+ this.scheduler = scheduler;
+ nodeOptions.setScheduler(scheduler);
Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
Mockito.when(this.node.getGroupId()).thenReturn("test");
Mockito.when(this.node.getTimerManager()).thenReturn(nodeOptions.getScheduler());
@@ -103,6 +116,8 @@ public class ReadOnlyServiceTest {
this.readOnlyServiceImpl.shutdown();
this.readOnlyServiceImpl.join();
disruptor.shutdown();
+ executors.forEach(ExecutorServiceHelper::shutdownAndAwaitTermination);
+ scheduler.shutdown();
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
index e7beb18..103ac9d 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import org.apache.ignite.raft.jraft.JRaftUtils;
@@ -47,6 +48,7 @@ import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.util.ByteString;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.ThreadId;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
@@ -90,6 +92,7 @@ public class ReplicatorTest {
private SnapshotStorage snapshotStorage;
private ReplicatorOptions opts;
private final PeerId peerId = new PeerId("localhost", 8081);
+ private ExecutorService executor;
@BeforeEach
public void setup() {
@@ -109,7 +112,8 @@ public class ReplicatorTest {
this.opts.setElectionTimeoutMs(1000);
NodeOptions options = new NodeOptions();
- options.setCommonExecutor(JRaftUtils.createExecutor("test-executor-", Utils.cpus()));
+ executor = JRaftUtils.createExecutor("test-executor-", Utils.cpus());
+ options.setCommonExecutor(executor);
Mockito.when(this.logManager.getLastLogIndex()).thenReturn(10L);
Mockito.when(this.logManager.getTerm(10)).thenReturn(1L);
@@ -154,6 +158,7 @@ public class ReplicatorTest {
@AfterEach
public void teardown() {
this.timerManager.shutdown();
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index d0a983f..a6d1d9f 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -28,6 +28,8 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@@ -56,6 +58,8 @@ import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.Nullable;
@@ -108,6 +112,12 @@ public class TestCluster {
*/
private final HashMap<Endpoint, StripedDisruptor<LogManagerImpl.StableClosureEvent>> logManagerDisruptors = new HashMap<>();
+ private List<ExecutorService> executors = new CopyOnWriteArrayList<>();
+
+ private List<FixedThreadsExecutorGroup> fixedThreadsExecutorGroups = new CopyOnWriteArrayList<>();
+
+ private List<Scheduler> schedulers = new CopyOnWriteArrayList<>();
+
private JRaftServiceFactory raftServiceFactory = new TestJRaftServiceFactory();
private LinkedHashSet<PeerId> learners;
@@ -209,10 +219,18 @@ public class TestCluster {
nodeOptions.setServerName(listenAddr.toString());
- nodeOptions.setCommonExecutor(JRaftUtils.createCommonExecutor(nodeOptions));
- nodeOptions.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(nodeOptions));
- nodeOptions.setClientExecutor(JRaftUtils.createClientExecutor(nodeOptions, nodeOptions.getServerName()));
- nodeOptions.setScheduler(JRaftUtils.createScheduler(nodeOptions));
+ ExecutorService executor = JRaftUtils.createCommonExecutor(nodeOptions);
+ executors.add(executor);
+ nodeOptions.setCommonExecutor(executor);
+ FixedThreadsExecutorGroup threadsExecutorGroup = JRaftUtils.createAppendEntriesExecutor(nodeOptions);
+ fixedThreadsExecutorGroups.add(threadsExecutorGroup);
+ nodeOptions.setStripedExecutor(threadsExecutorGroup);
+ ExecutorService clientExecutor = JRaftUtils.createClientExecutor(nodeOptions, nodeOptions.getServerName());
+ executors.add(clientExecutor);
+ nodeOptions.setClientExecutor(clientExecutor);
+ Scheduler scheduler = JRaftUtils.createScheduler(nodeOptions);
+ schedulers.add(scheduler);
+ nodeOptions.setScheduler(scheduler);
nodeOptions.setElectionTimeoutMs(this.electionTimeoutMs);
nodeOptions.setEnableMetrics(enableMetrics);
@@ -278,7 +296,11 @@ public class TestCluster {
nodeOptions.setRpcClient(rpcClient);
- var rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions);
+ ExecutorService requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
+
+ executors.add(requestExecutor);
+
+ var rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions, requestExecutor);
clusterService.start();
@@ -364,6 +386,9 @@ public class TestCluster {
nodeDisruptors.values().forEach(StripedDisruptor::shutdown);
readOnlyServiceDisruptors.values().forEach(StripedDisruptor::shutdown);
logManagerDisruptors.values().forEach(StripedDisruptor::shutdown);
+ executors.forEach(ExecutorServiceHelper::shutdownAndAwaitTermination);
+ fixedThreadsExecutorGroups.forEach(FixedThreadsExecutorGroup::shutdownGracefully);
+ schedulers.forEach(Scheduler::shutdown);
}
public void clean(final Endpoint listenAddr) {
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
index ad3121f..6691630 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
@@ -20,6 +20,8 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.lang.IgniteLogger;
@@ -29,6 +31,7 @@ import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.codec.v1.V1Decoder;
import org.apache.ignite.raft.jraft.entity.codec.v1.V1Encoder;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -99,8 +102,9 @@ public class LogEntryCodecPerfTest {
throws InterruptedException,
BrokenBarrierException {
final CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
+ ExecutorService executor = Executors.newFixedThreadPool(THREADS);
for (int i = 0; i < THREADS; i++) {
- new Thread(() -> {
+ executor.execute(() -> {
try {
testEncodeDecode(encoder, decoder, barrier);
}
@@ -108,12 +112,13 @@ public class LogEntryCodecPerfTest {
LOG.error("Failed to run test", e); // NOPMD
fail();
}
- }).start();
+ });
}
long start = Utils.monotonicMs();
barrier.await();
barrier.await();
LOG.info(version + " codec cost:" + (Utils.monotonicMs() - start) + " ms.");
LOG.info("Total log size:" + this.logSize.get() + " bytes.");
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java
index 3a65314..9c894e6 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractClientServiceTest.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.jraft.rpc;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Status;
@@ -30,6 +31,8 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests.PingRequest;
import org.apache.ignite.raft.jraft.rpc.impl.AbstractClientService;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -59,17 +62,24 @@ public class AbstractClientServiceTest {
private RpcClient rpcClient;
private RpcResponseFactory rpcResponseFactory = RaftRpcFactory.DEFAULT;
private final Endpoint endpoint = new Endpoint("localhost", 8081);
+ private ExecutorService clientExecutor;
@BeforeEach
public void setup() throws Exception {
this.rpcOptions = new RpcOptions();
- this.rpcOptions.setClientExecutor(JRaftUtils.createClientExecutor(this.rpcOptions, "unittest"));
+ clientExecutor = JRaftUtils.createClientExecutor(this.rpcOptions, "unittest");
+ this.rpcOptions.setClientExecutor(clientExecutor);
this.clientService = new MockClientService();
when(this.rpcClient.invokeAsync(any(), any(), any(), any(), anyLong())).thenReturn(new CompletableFuture<>());
this.rpcOptions.setRpcClient(this.rpcClient);
assertTrue(this.clientService.init(this.rpcOptions));
}
+ @AfterEach
+ public void teardown() {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(clientExecutor);
+ }
+
static class MockRpcResponseClosure<T extends Message> extends RpcResponseClosureAdapter<T> {
CountDownLatch latch = new CountDownLatch(1);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index 69cfafc..f9bf10b 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -19,16 +19,20 @@ package org.apache.ignite.raft.jraft.rpc;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterEach;
import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
@@ -39,6 +43,17 @@ public class IgniteRpcTest extends AbstractRpcTest {
/** The counter. */
private final AtomicInteger cntr = new AtomicInteger();
+ /** Requests executor. */
+ private ExecutorService requestExecutor;
+
+ /** {@inheritDoc} */
+ @AfterEach
+ @Override public void tearDown() {
+ super.tearDown();
+
+ ExecutorServiceHelper.shutdownAndAwaitTermination(requestExecutor);
+ }
+
/** {@inheritDoc} */
@Override public RpcServer<?> createServer(Endpoint endpoint) {
ClusterService service = ClusterServiceTestUtils.clusterService(
@@ -49,7 +64,11 @@ public class IgniteRpcTest extends AbstractRpcTest {
new TestScaleCubeClusterServiceFactory()
);
- var server = new TestIgniteRpcServer(service, new NodeManager(), new NodeOptions()) {
+ NodeOptions nodeOptions = new NodeOptions();
+
+ requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
+
+ var server = new TestIgniteRpcServer(service, new NodeManager(), nodeOptions, requestExecutor) {
@Override public void shutdown() {
super.shutdown();
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 77846bb..c71390b 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -17,8 +17,8 @@
package org.apache.ignite.raft.jraft.rpc;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
@@ -27,39 +27,22 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
* RPC server configured for integration tests.
*/
public class TestIgniteRpcServer extends IgniteRpcServer {
- /** */
- private final NodeOptions nodeOptions;
-
/**
* @param clusterService Cluster service.
* @param nodeManager Node manager.
* @param nodeOptions Node options.
+ * @param requestExecutor Requests executor.
*/
- public TestIgniteRpcServer(ClusterService clusterService, NodeManager nodeManager, NodeOptions nodeOptions) {
+ public TestIgniteRpcServer(ClusterService clusterService, NodeManager nodeManager, NodeOptions nodeOptions,
+ ExecutorService requestExecutor) {
super(
clusterService,
nodeManager,
nodeOptions.getRaftClientMessagesFactory(),
nodeOptions.getRaftMessagesFactory(),
- JRaftUtils.createRequestExecutor(nodeOptions)
+ requestExecutor
);
clusterService.messagingService().addMessageHandler(TestMessageGroup.class, new RpcMessageHandler());
-
- this.nodeOptions = nodeOptions;
- }
-
- /** {@inheritDoc} */
- @Override public void shutdown() {
- super.shutdown();
-
- if (this.nodeOptions.getClientExecutor() != null)
- this.nodeOptions.getClientExecutor().shutdown();
-
- if (this.nodeOptions.getStripedExecutor() != null)
- this.nodeOptions.getStripedExecutor().shutdownGracefully();
-
- if (this.nodeOptions.getCommonExecutor() != null)
- this.nodeOptions.getCommonExecutor().shutdown();
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/FutureTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/FutureTest.java
index 7b4d59a..f9ca12b 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/FutureTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/FutureTest.java
@@ -64,11 +64,16 @@ public class FutureTest {
@Test
public void testGet() throws Exception {
FutureImpl<Boolean> future = new FutureImpl<Boolean>();
- new Thread(new NotifyFutureRunner(future, 2000, null)).start();
- boolean result = future.get();
- assertTrue(result);
- assertTrue(future.isDone());
- assertFalse(future.isCancelled());
+ Thread t = new Thread(new NotifyFutureRunner(future, 2000, null));
+ try {
+ t.start();
+ boolean result = future.get();
+ assertTrue(result);
+ assertTrue(future.isDone());
+ assertFalse(future.isCancelled());
+ } finally {
+ t.join();
+ }
}
@Test
@@ -84,45 +89,51 @@ public class FutureTest {
@Test
public void testGetException() throws Exception {
FutureImpl<Boolean> future = new FutureImpl<Boolean>();
- new Thread(new NotifyFutureRunner(future, 2000, new IOException("hello"))).start();
+ Thread t = new Thread(new NotifyFutureRunner(future, 2000, new IOException("hello")));
try {
- future.get();
- fail();
- }
- catch (ExecutionException e) {
- assertEquals("hello", e.getCause().getMessage());
+ t.start();
+ try {
+ future.get();
+ fail();
+ }
+ catch (ExecutionException e) {
+ assertEquals("hello", e.getCause().getMessage());
+ }
+ assertTrue(future.isDone());
+ assertFalse(future.isCancelled());
+ } finally {
+ t.join();
}
- assertTrue(future.isDone());
- assertFalse(future.isCancelled());
-
}
@Test
public void testCancel() throws Exception {
final FutureImpl<Boolean> future = new FutureImpl<Boolean>();
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(3000);
- future.cancel(true);
- }
- catch (Exception e) {
- log.error(e.getMessage(), e);
- }
+ Thread t = new Thread(() -> {
+ try {
+ Thread.sleep(3000);
+ future.cancel(true);
}
- }).start();
+ catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ });
try {
- future.get();
- fail();
- }
- catch (CancellationException e) {
- assertTrue(true);
+ t.start();
+ try {
+ future.get();
+ fail();
+ }
+ catch (CancellationException e) {
+ assertTrue(true);
+ }
+ assertTrue(future.isDone());
+ assertTrue(future.isCancelled());
+ } finally {
+ t.join();
}
- assertTrue(future.isDone());
- assertTrue(future.isCancelled());
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
index 9bb0432..1809205 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java
@@ -76,10 +76,11 @@ public class AppendEntriesRequestProcessorTest extends BaseNodeRequestProcessorT
}
@AfterEach
- public void teardown() {
+ @Override public void teardown() {
if (this.executor != null) {
this.executor.shutdownNow();
}
+ super.teardown();
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
index 1f92712..c6d83dd 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/core/BaseNodeRequestProcessorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft.rpc.impl.core;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -26,6 +27,9 @@ import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftServerService;
import org.apache.ignite.raft.jraft.test.MockAsyncContext;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -41,6 +45,8 @@ public abstract class BaseNodeRequestProcessorTest<T extends Message> {
protected final String peerIdStr = "localhost:8081";
protected MockAsyncContext asyncContext;
protected RaftMessagesFactory msgFactory = new RaftMessagesFactory();
+ private ExecutorService executor;
+ private FixedThreadsExecutorGroup appendEntriesExecutor;
public abstract T createRequest(String groupId, PeerId peerId);
@@ -54,6 +60,13 @@ public abstract class BaseNodeRequestProcessorTest<T extends Message> {
this.asyncContext = new MockAsyncContext();
}
+ @AfterEach
+ public void teardown() {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
+ if (appendEntriesExecutor != null)
+ appendEntriesExecutor.shutdownGracefully();
+ }
+
@Test
public void testHandleRequest() {
final PeerId peerId = mockNode();
@@ -71,8 +84,10 @@ public abstract class BaseNodeRequestProcessorTest<T extends Message> {
NodeOptions nodeOptions = new NodeOptions();
- nodeOptions.setCommonExecutor(JRaftUtils.createCommonExecutor(nodeOptions));
- nodeOptions.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(nodeOptions));
+ executor = JRaftUtils.createCommonExecutor(nodeOptions);
+ nodeOptions.setCommonExecutor(executor);
+ appendEntriesExecutor = JRaftUtils.createAppendEntriesExecutor(nodeOptions);
+ nodeOptions.setStripedExecutor(appendEntriesExecutor);
Mockito.lenient().when(node.getOptions()).thenReturn(nodeOptions);
if (asyncContext != null)
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
index 597e25b..e650e03 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.jraft.storage;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
@@ -52,6 +53,7 @@ import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotWriter;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -97,6 +99,7 @@ public class SnapshotExecutorTest extends BaseStorageTest {
private LocalSnapshotStorage snapshotStorage;
private TimerManager timerManager;
private NodeOptions options;
+ private ExecutorService executorService;
@BeforeEach
public void setup() throws Exception {
@@ -139,6 +142,7 @@ public class SnapshotExecutorTest extends BaseStorageTest {
executor.shutdown();
timerManager.shutdown();
options.getCommonExecutor().shutdown();
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executorService);
}
@Test
@@ -244,8 +248,10 @@ public class SnapshotExecutorTest extends BaseStorageTest {
Mockito.lenient().when(
raftClientService.getFile(eq(new Endpoint("localhost", 8080)), eq(rb),
eq(copyOpts.getTimeoutMs()), argument.capture())).thenReturn(future);
+ ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
+ executorService = singleThreadExecutor;
Utils.runInThread(
- Executors.newSingleThreadExecutor(),
+ singleThreadExecutor,
() -> executor.installSnapshot(irb, msgFactory.installSnapshotResponse(), new RpcRequestClosure(asyncCtx, msgFactory))
);
@@ -278,7 +284,9 @@ public class SnapshotExecutorTest extends BaseStorageTest {
public void testNotDoSnapshotWithIntervalDist() throws Exception {
final NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setSnapshotLogIndexMargin(10);
- nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
+ ExecutorService testExecutor = JRaftUtils.createExecutor("test-executor", Utils.cpus());
+ executorService = testExecutor;
+ nodeOptions.setCommonExecutor(testExecutor);
Mockito.when(node.getOptions()).thenReturn(nodeOptions);
Mockito.when(fSMCaller.getLastAppliedIndex()).thenReturn(1L);
executor.doSnapshot(null);
@@ -292,7 +300,9 @@ public class SnapshotExecutorTest extends BaseStorageTest {
public void testDoSnapshotWithIntervalDist() throws Exception {
final NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setSnapshotLogIndexMargin(5);
- nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
+ ExecutorService testExecutor = JRaftUtils.createExecutor("test-executor", Utils.cpus());
+ executorService = testExecutor;
+ nodeOptions.setCommonExecutor(testExecutor);
Mockito.when(node.getOptions()).thenReturn(nodeOptions);
Mockito.when(fSMCaller.getLastAppliedIndex()).thenReturn(6L);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index 04c36df..0ab6d0b 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.storage.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
@@ -39,6 +40,7 @@ import org.apache.ignite.raft.jraft.storage.BaseStorageTest;
import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -74,6 +76,8 @@ public class LogManagerTest extends BaseStorageTest {
/** Disruptor for this service test. */
private StripedDisruptor disruptor;
+ private ExecutorService executor;
+
@BeforeEach
public void setup() throws Exception {
this.confManager = new ConfigurationManager();
@@ -83,7 +87,8 @@ public class LogManagerTest extends BaseStorageTest {
final LogManagerOptions opts = new LogManagerOptions();
NodeOptions nodeOptions = new NodeOptions();
- nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
+ executor = JRaftUtils.createExecutor("test-executor", Utils.cpus());
+ nodeOptions.setCommonExecutor(executor);
Mockito.when(node.getOptions()).thenReturn(nodeOptions);
opts.setConfigurationManager(this.confManager);
@@ -109,6 +114,7 @@ public class LogManagerTest extends BaseStorageTest {
public void teardown() throws Exception {
this.logStorage.shutdown();
disruptor.shutdown();
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySessionTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySessionTest.java
index 850ce6d..15c5342 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySessionTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySessionTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -76,6 +77,7 @@ public class CopySessionTest {
public void teardown() {
Utils.closeQuietly(this.session);
this.timerManager.shutdown();
+ ExecutorServiceHelper.shutdownAndAwaitTermination(this.nodeOptions.getCommonExecutor());
}
@Test
@@ -94,32 +96,34 @@ public class CopySessionTest {
@Test
public void testOnRpcReturnedEOF() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
- new Thread() {
- @Override
- public void run() {
- try {
- //test join, should return
- session.join();
- latch.countDown();
- }
- catch (final InterruptedException e) {
- // No-op.
- }
+ Thread t = new Thread(() -> {
+ try {
+ //test join, should return
+ session.join();
+ latch.countDown();
}
- }.start();
- assertNull(this.session.getRpcCall());
- final ByteBufferCollector bufRef = ByteBufferCollector.allocate(0);
- this.session.setDestBuf(bufRef);
-
- this.session.onRpcReturned(Status.OK(), raftOpts.getRaftMessagesFactory().getFileResponse().readSize(100).eof(true)
- .data(new ByteString(new byte[100])).build());
- assertEquals(100, bufRef.capacity());
- //should be flip
- assertEquals(0, bufRef.getBuffer().position());
- assertEquals(100, bufRef.getBuffer().remaining());
-
- assertNull(this.session.getRpcCall());
- latch.await();
+ catch (final InterruptedException e) {
+ // No-op.
+ }
+ });
+ try {
+ t.start();
+ assertNull(this.session.getRpcCall());
+ final ByteBufferCollector bufRef = ByteBufferCollector.allocate(0);
+ this.session.setDestBuf(bufRef);
+
+ this.session.onRpcReturned(Status.OK(), raftOpts.getRaftMessagesFactory().getFileResponse().readSize(100).eof(true)
+ .data(new ByteString(new byte[100])).build());
+ assertEquals(100, bufRef.capacity());
+ //should be flip
+ assertEquals(0, bufRef.getBuffer().position());
+ assertEquals(100, bufRef.getBuffer().remaining());
+
+ assertNull(this.session.getRpcCall());
+ latch.await();
+ } finally {
+ t.join();
+ }
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ByteBufferCollectorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ByteBufferCollectorTest.java
index 7825728..72f5418 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ByteBufferCollectorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ByteBufferCollectorTest.java
@@ -52,8 +52,11 @@ public class ByteBufferCollectorTest {
public void testMultipleRecycleAtDifferentThread() throws InterruptedException {
final ByteBufferCollector object = ByteBufferCollector.allocateByRecyclers();
final Thread thread1 = new Thread(object::recycle);
- thread1.start();
- thread1.join();
+ try {
+ thread1.start();
+ } finally {
+ thread1.join();
+ }
assertSame(object, ByteBufferCollector.allocateByRecyclers());
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/CountDownEventTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/CountDownEventTest.java
index cdaf2c9..325007a 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/CountDownEventTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/CountDownEventTest.java
@@ -17,10 +17,11 @@
package org.apache.ignite.raft.jraft.util;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.lang.IgniteLogger;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -29,6 +30,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class CountDownEventTest {
private static final IgniteLogger LOG = IgniteLogger.forClass(CountDownEventTest.class);
+ private ExecutorService executor;
+
+ @AfterEach
+ public void teardown() {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
+ }
+
@Test
public void testAwait() throws Exception {
CountDownEvent e = new CountDownEvent();
@@ -36,7 +44,7 @@ public class CountDownEventTest {
e.incrementAndGet();
AtomicLong cost = new AtomicLong(0);
CountDownLatch latch = new CountDownLatch(1);
- Executor executor = Executors.newSingleThreadExecutor();
+ executor = Executors.newSingleThreadExecutor();
Utils.runInThread(executor, new Runnable() {
@Override
public void run() {
@@ -65,7 +73,7 @@ public class CountDownEventTest {
e.incrementAndGet();
e.incrementAndGet();
Thread thread = Thread.currentThread();
- Executor executor = Executors.newSingleThreadExecutor();
+ executor = Executors.newSingleThreadExecutor();
Utils.runInThread(executor, new Runnable() {
@Override
public void run() {
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/RecyclableByteBufferListTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/RecyclableByteBufferListTest.java
index 8f744ea..ea7039a 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/RecyclableByteBufferListTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/RecyclableByteBufferListTest.java
@@ -37,8 +37,11 @@ public class RecyclableByteBufferListTest {
public void testMultipleRecycleAtDifferentThread() throws InterruptedException {
final RecyclableByteBufferList object = RecyclableByteBufferList.newInstance();
final Thread thread1 = new Thread(object::recycle);
- thread1.start();
- thread1.join();
+ try {
+ thread1.start();
+ } finally {
+ thread1.join();
+ }
assertSame(object, RecyclableByteBufferList.newInstance());
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/RecyclersTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/RecyclersTest.java
index 9c97397..7602045 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/RecyclersTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/RecyclersTest.java
@@ -54,8 +54,11 @@ public class RecyclersTest {
final Recyclers<RecyclableObject> recyclers = newRecyclers(512);
final RecyclableObject object = recyclers.get();
final Thread thread1 = new Thread(() -> recyclers.recycle(object, object.handle));
- thread1.start();
- thread1.join();
+ try {
+ thread1.start();
+ } finally {
+ thread1.join();
+ }
assertSame(object, recyclers.get());
}
@@ -66,8 +69,11 @@ public class RecyclersTest {
final AtomicReference<IllegalStateException> exceptionStore = new AtomicReference<>();
final Thread thread1 = new Thread(() -> recyclers.recycle(object, object.handle));
- thread1.start();
- thread1.join();
+ try {
+ thread1.start();
+ } finally {
+ thread1.join();
+ }
final Thread thread2 = new Thread(() -> {
try {
@@ -77,8 +83,11 @@ public class RecyclersTest {
exceptionStore.set(e);
}
});
- thread2.start();
- thread2.join();
+ try {
+ thread2.start();
+ } finally {
+ thread2.join();
+ }
assertNotNull(exceptionStore.get());
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
index 5603171..62890f2 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ThreadIdTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.raft.jraft.util;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
@@ -48,24 +50,23 @@ public class ThreadIdTest implements ThreadId.OnError {
CountDownLatch latch = new CountDownLatch(1);
- var t = new Thread() {
- @Override
- public void run() {
- ThreadIdTest.this.id.lock();
+ var t = new Thread(() -> {
+ ThreadIdTest.this.id.lock();
- latch.countDown();
- }
- };
-
- t.start();
+ latch.countDown();
+ });
- assertEquals(1, latch.getCount());
+ try {
+ t.start();
- this.id.unlock();
+ assertEquals(1, latch.getCount());
- TestUtils.waitForCondition(() -> latch.getCount() == 0, 10_000);
+ this.id.unlock();
- t.join();
+ TestUtils.waitForCondition(() -> latch.getCount() == 0, 10_000);
+ } finally {
+ t.join();
+ }
}
@Test
@@ -74,19 +75,21 @@ public class ThreadIdTest implements ThreadId.OnError {
assertEquals(100, this.errorCode);
this.id.lock();
CountDownLatch latch = new CountDownLatch(1);
- new Thread() {
- @Override
- public void run() {
- ThreadIdTest.this.id.setError(99);
- latch.countDown();
- }
- }.start();
- latch.await();
- //just go into pending errors.
- assertEquals(100, this.errorCode);
- //invoke onError when unlock
- this.id.unlock();
- assertEquals(99, this.errorCode);
+ Thread t = new Thread(() -> {
+ ThreadIdTest.this.id.setError(99);
+ latch.countDown();
+ });
+ try {
+ t.start();
+ latch.await();
+ //just go into pending errors.
+ assertEquals(100, this.errorCode);
+ //invoke onError when unlock
+ this.id.unlock();
+ assertEquals(99, this.errorCode);
+ } finally {
+ t.join();
+ }
}
@Test
@@ -94,20 +97,19 @@ public class ThreadIdTest implements ThreadId.OnError {
AtomicInteger lockSuccess = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(10);
this.id.lock();
+ ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
- new Thread() {
- @Override
- public void run() {
- if (ThreadIdTest.this.id.lock() != null) {
- lockSuccess.incrementAndGet();
- }
- latch.countDown();
+ executor.execute(() -> {
+ if (ThreadIdTest.this.id.lock() != null) {
+ lockSuccess.incrementAndGet();
}
- }.start();
+ latch.countDown();
+ });
}
this.id.unlockAndDestroy();
latch.await();
assertEquals(0, lockSuccess.get());
assertNull(this.id.lock());
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/UtilsTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/UtilsTest.java
index 0481ed0..f8cc0c2 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/UtilsTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/UtilsTest.java
@@ -18,11 +18,12 @@ package org.apache.ignite.raft.jraft.util;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.error.RaftError;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -36,7 +37,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*
*/
public class UtilsTest {
- private Executor executor = Executors.newSingleThreadExecutor();
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ @AfterEach
+ public void teardown() throws Exception {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
+ }
@Test
public void testRunThread() throws Exception {
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/LongHeldDetectingReadWriteLockTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/LongHeldDetectingReadWriteLockTest.java
index dcab24b..0e26ce4 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/LongHeldDetectingReadWriteLockTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/LongHeldDetectingReadWriteLockTest.java
@@ -55,37 +55,56 @@ public class LongHeldDetectingReadWriteLockTest {
};
final CountDownLatch latch = new CountDownLatch(1);
- new Thread(() -> {
- readWriteLock.writeLock().lock();
- latch.countDown();
- try {
- Thread.sleep(2000);
- }
- catch (final InterruptedException e) {
- LOG.error("Thread was interrupted", e);
- }
- finally {
- readWriteLock.writeLock().unlock();
- }
- }, "write-lock-thread") //
- .start();
+ Thread t1 = null;
+ Thread t2 = null;
+ Thread t3 = null;
+ try {
+ t1 = new Thread(() -> {
+ readWriteLock.writeLock().lock();
+ latch.countDown();
+ try {
+ Thread.sleep(2000);
+ }
+ catch (final InterruptedException e) {
+ LOG.error("Thread was interrupted", e);
+ }
+ finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }, "write-lock-thread");
+
+ t1.start();
+
+ latch.await();
+
+ final CountDownLatch latch1 = new CountDownLatch(2);
+ t2 = new Thread(() -> {
+ readWriteLock.readLock().lock();
+ readWriteLock.readLock().unlock();
+ latch1.countDown();
+ }, "read-lock-thread-1");
+
+ t2.start();
+
+ t3 = new Thread(() -> {
+ readWriteLock.readLock().lock();
+ readWriteLock.readLock().unlock();
+ latch1.countDown();
+ }, "read-lock-thread-2");
- latch.await();
+ t3.start();
- final CountDownLatch latch1 = new CountDownLatch(2);
- new Thread(() -> {
- readWriteLock.readLock().lock();
- readWriteLock.readLock().unlock();
- latch1.countDown();
- }, "read-lock-thread-1").start();
+ latch1.await();
+ } finally {
+ if (t1 != null)
+ t1.join();
- new Thread(() -> {
- readWriteLock.readLock().lock();
- readWriteLock.readLock().unlock();
- latch1.countDown();
- }, "read-lock-thread-2").start();
+ if (t2 != null)
+ t2.join();
- latch1.await();
+ if (t3 != null)
+ t3.join();
+ }
}
@Test
@@ -109,36 +128,55 @@ public class LongHeldDetectingReadWriteLockTest {
};
final CountDownLatch latch = new CountDownLatch(1);
- new Thread(() -> {
- readWriteLock.readLock().lock();
- latch.countDown();
- try {
- Thread.sleep(2000);
- }
- catch (final InterruptedException e) {
- LOG.error("Thread was interrupted", e);
- }
- finally {
- readWriteLock.readLock().unlock();
- }
- }, "read-lock-thread") //
- .start();
+ Thread t1 = null;
+ Thread t2 = null;
+ Thread t3 = null;
+ try {
+ t1 = new Thread(() -> {
+ readWriteLock.readLock().lock();
+ latch.countDown();
+ try {
+ Thread.sleep(2000);
+ }
+ catch (final InterruptedException e) {
+ LOG.error("Thread was interrupted", e);
+ }
+ finally {
+ readWriteLock.readLock().unlock();
+ }
+ }, "read-lock-thread");
+
+ t1.start();
+
+ latch.await();
+
+ final CountDownLatch latch1 = new CountDownLatch(2);
+ t2 = new Thread(() -> {
+ readWriteLock.writeLock().lock();
+ readWriteLock.writeLock().unlock();
+ latch1.countDown();
+ }, "write-lock-thread-1");
+
+ t2.start();
+
+ t3 = new Thread(() -> {
+ readWriteLock.writeLock().lock();
+ readWriteLock.writeLock().unlock();
+ latch1.countDown();
+ }, "write-lock-thread-2");
- latch.await();
+ t3.start();
- final CountDownLatch latch1 = new CountDownLatch(2);
- new Thread(() -> {
- readWriteLock.writeLock().lock();
- readWriteLock.writeLock().unlock();
- latch1.countDown();
- }, "write-lock-thread-1").start();
+ latch1.await();
+ } finally {
+ if (t1 != null)
+ t1.join();
- new Thread(() -> {
- readWriteLock.writeLock().lock();
- readWriteLock.writeLock().unlock();
- latch1.countDown();
- }, "write-lock-thread-2").start();
+ if (t2 != null)
+ t2.join();
- latch1.await();
+ if (t3 != null)
+ t3.join();
+ }
}
}
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index d63fa39..6b02c6a 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -152,6 +152,13 @@
<type>test-jar</type>
</dependency>
+ <!-- Logging in tests -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- Benchmarks dependencies -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
index df756c0..0825233 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
@@ -396,9 +396,9 @@ public abstract class AbstractLockManagerTest extends IgniteAbstractTest {
Random r = new Random();
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread(new Runnable() {
- @Override public void run() {
+ try {
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
try {
startBar.await();
}
@@ -456,19 +456,19 @@ public abstract class AbstractLockManagerTest extends IgniteAbstractTest {
}
}
}
- }
- });
-
- threads[i].setName("Worker" + i);
- threads[i].start();
- }
+ });
- Thread.sleep(duration);
+ threads[i].setName("Worker" + i);
+ threads[i].start();
+ }
- stop.set(true);
+ Thread.sleep(duration);
- for (Thread thread : threads)
- thread.join();
+ stop.set(true);
+ } finally {
+ for (Thread thread : threads)
+ thread.join();
+ }
log.info("After test rLocks={} wLocks={} fLocks={}", rLocks.sum(), wLocks.sum(), fLocks.sum());