You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2023/01/25 12:38:57 UTC

[ignite-3] 01/02: IGNITE-18446 Add busylock to raft service.

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-18446
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit bdbea97b9a6a3a800c2c84826a99207956a8750c
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Jan 18 14:58:17 2023 +0200

    IGNITE-18446 Add busylock to raft service.
---
 .../cluster/management/raft/CmgRaftService.java    |  4 +-
 .../matchers/CompletableFutureMatcher.java         | 35 ++++++++--
 .../impl/ItMetaStorageManagerImplTest.java         | 52 ++++++++++++--
 .../metastorage/impl/MetaStorageManagerImpl.java   | 21 ++++++
 .../metastorage/impl/MetaStorageService.java       |  6 +-
 .../metastorage/impl/MetaStorageServiceImpl.java   |  5 ++
 .../ignite/internal/raft/RaftGroupServiceImpl.java | 79 ++++++++++++++--------
 7 files changed, 159 insertions(+), 43 deletions(-)

diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index 9459c855d0..75d4525ac7 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -193,9 +193,9 @@ public class CmgRaftService {
     }
 
     /**
-     * Returns a list of consistent IDs of the voting nodes of the CMG.
+     * Returns a set of consistent IDs of the voting nodes of the CMG.
      *
-     * @return List of consistent IDs of the voting nodes of the CMG.
+     * @return Set of consistent IDs of the voting nodes of the CMG.
      */
     public Set<String> nodeNames() {
         List<Peer> peers = raftService.peers();
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index bdbd3df5d9..18bb7661cd 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -95,7 +96,7 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu
             }
 
             return matcher.matches(res);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+        } catch (InterruptedException | ExecutionException | TimeoutException | CancellationException e) {
             if (causeOfFail != null) {
                 assertTrue(hasCause(e, causeOfFail, null));
 
@@ -150,10 +151,9 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu
     }
 
     /**
-     * Creates a matcher that matches a future that completes successfully and decently fast.
+     * Creates a matcher that matches a future that completes exceptionally and decently fast.
      *
-     * @param cause If {@code null}, the future should be completed successfully, otherwise it specifies the class of cause
-     *                    throwable.
+     * @param cause The class of cause throwable.
      * @return matcher.
      */
     public static CompletableFutureMatcher<Object> willFailFast(Class<? extends Throwable> cause) {
@@ -161,18 +161,39 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu
     }
 
     /**
-     * Creates a matcher that matches a future that completes successfully with any result within the given timeout.
+     * Creates a matcher that matches a future that completes exceptionally within the given timeout.
      *
      * @param time Timeout.
      * @param timeUnit Time unit for timeout.
-     * @param cause If {@code null}, the future should be completed successfully, otherwise it specifies the class of cause
-     *                    throwable.
+     * @param cause The class of cause throwable.
      * @return matcher.
      */
     public static CompletableFutureMatcher<Object> willFailIn(int time, TimeUnit timeUnit, Class<? extends Throwable> cause) {
+        assert cause != null;
+
         return new CompletableFutureMatcher<>(anything(), time, timeUnit, cause);
     }
 
+    /**
+     * Creates a matcher that matches a future that will be cancelled and decently fast.
+     *
+     * @return matcher.
+     */
+    public static CompletableFutureMatcher<Object> willBeCancelledFast() {
+        return willBeCancelledIn(1, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Creates a matcher that matches a future that will be cancelled within the given timeout.
+     *
+     * @param time Timeout.
+     * @param timeUnit Time unit for timeout.
+     * @return matcher.
+     */
+    public static CompletableFutureMatcher<Object> willBeCancelledIn(int time, TimeUnit timeUnit) {
+        return new CompletableFutureMatcher<>(anything(), time, timeUnit, CancellationException.class);
+    }
+
     /**
      * A shorter version of {@link #willBe} to be used with some matchers for aesthetic reasons.
      */
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index ba2481725b..ecf3406336 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -21,6 +21,8 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBeCancelledFast;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
 import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -41,14 +43,13 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -78,9 +79,11 @@ public class ItMetaStorageManagerImplTest {
 
     private ClusterService clusterService;
 
-    private RaftManager raftManager;
+    private Loza raftManager;
 
-    private MetaStorageManager metaStorageManager;
+    private KeyValueStorage storage;
+
+    private MetaStorageManagerImpl metaStorageManager;
 
     @BeforeEach
     void setUp(TestInfo testInfo, @WorkDirectory Path workDir, @InjectConfiguration RaftConfiguration raftConfiguration)
@@ -98,7 +101,7 @@ public class ItMetaStorageManagerImplTest {
         when(cmgManager.metaStorageNodes())
                 .thenReturn(completedFuture(Set.of(clusterService.localConfiguration().getName())));
 
-        var storage = new RocksDbKeyValueStorage(clusterService.localConfiguration().getName(), workDir.resolve("metastorage"));
+        storage = new RocksDbKeyValueStorage(clusterService.localConfiguration().getName(), workDir.resolve("metastorage"));
 
         metaStorageManager = new MetaStorageManagerImpl(
                 vaultManager,
@@ -236,4 +239,43 @@ public class ItMetaStorageManagerImplTest {
             public void onError(Throwable e) {}
         };
     }
+
+    @Test
+    void testMetaStorageStopClosesRaftService() throws Exception {
+        MetaStorageServiceImpl svc = metaStorageManager.metaStorageServiceFuture().join();
+
+        metaStorageManager.stop();
+
+        CompletableFuture<Entry> fut = svc.get(ByteArray.fromString("ignored"));
+
+        assertThat(fut, willFailFast(NodeStoppingException.class));
+    }
+
+    @Test
+    void testMetaStorageStopBeforeRaftServiceStarted() throws Exception {
+        metaStorageManager.stop(); // Close MetaStorage that is created in setUp.
+
+        ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class);
+
+        Set<String> msNodes = Set.of(clusterService.localConfiguration().getName());
+        CompletableFuture<Set<String>> cmgFut = new CompletableFuture<>();
+
+        when(cmgManager.metaStorageNodes()).thenReturn(cmgFut);
+
+        metaStorageManager = new MetaStorageManagerImpl(
+                vaultManager,
+                clusterService,
+                cmgManager,
+                raftManager,
+                storage
+        );
+
+        metaStorageManager.stop();
+
+        // Unblock the future so raft service can be initialized. Although the future should be cancelled already by the
+        // stop method.
+        cmgFut.complete(msNodes);
+
+        assertThat(metaStorageManager.metaStorageServiceFuture(), willBeCancelledFast());
+    }
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index eec4461432..d40aacaf17 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -68,6 +68,7 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.TopologyEventHandler;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * MetaStorage manager.
@@ -320,10 +321,25 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
 
         IgniteUtils.closeAll(
                 () -> raftMgr.stopRaftNodes(MetastorageGroupId.INSTANCE),
+                this::closeMetaStorageService,
                 storage::close
         );
     }
 
+    private void closeMetaStorageService() {
+        if (metaStorageSvcFut.isCancelled() || metaStorageSvcFut.isCompletedExceptionally()) {
+            return;
+        }
+
+        assert metaStorageSvcFut.isDone();
+
+        MetaStorageServiceImpl metaStorageService = metaStorageSvcFut.join();
+
+        assert metaStorageService != null;
+
+        metaStorageService.close();
+    }
+
     @Override
     public long appliedRevision() {
         return appliedRevision;
@@ -791,4 +807,9 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
             }
         }
     }
+
+    @TestOnly
+    CompletableFuture<MetaStorageServiceImpl> metaStorageServiceFuture() {
+        return metaStorageSvcFut;
+    }
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
index b774971013..b1f7f7a7f1 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.If;
@@ -35,7 +36,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Defines interface for access to a meta storage service.
  */
-public interface MetaStorageService {
+public interface MetaStorageService extends ManuallyCloseable {
     /**
      * Retrieves an entry for the given key.
      *
@@ -309,4 +310,7 @@ public interface MetaStorageService {
      * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
      */
     CompletableFuture<Void> closeCursors(String nodeId);
+
+    @Override
+    void close();
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index a1bbe42ca1..1d64da7871 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -314,6 +314,11 @@ public class MetaStorageServiceImpl implements MetaStorageService {
         return metaStorageRaftGrpSvc.run(commandsFactory.cursorsCloseCommand().nodeId(nodeId).build());
     }
 
+    @Override
+    public void close() {
+        metaStorageRaftGrpSvc.shutdown();
+    }
+
     private static List<OperationInfo> toOperationInfos(Collection<Operation> ops, MetaStorageCommandsFactory commandsFactory) {
         List<OperationInfo> res = new ArrayList<>(ops.size());
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 5f41d131c1..af07da5175 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -58,7 +58,9 @@ import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkMessage;
@@ -108,6 +110,9 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     /** Executor for scheduling retries of {@link RaftGroupServiceImpl#sendWithRetry} invocations. */
     private final ScheduledExecutorService executor;
 
+    /** Busy lock. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
     /**
      * Constructor.
      *
@@ -477,7 +482,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
     @Override
     public void shutdown() {
-        // No-op.
+        busyLock.block();
     }
 
     @Override
@@ -507,38 +512,48 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     private <R extends NetworkMessage> void sendWithRetry(
             Peer peer, Function<Peer, ? extends NetworkMessage> requestFactory, long stopTime, CompletableFuture<R> fut
     ) {
-        if (currentTimeMillis() >= stopTime) {
-            fut.completeExceptionally(new TimeoutException());
+        if (!busyLock.enterBusy()) {
+            fut.completeExceptionally(new NodeStoppingException());
 
             return;
         }
 
-        NetworkMessage request = requestFactory.apply(peer);
-
-        //TODO: IGNITE-15389 org.apache.ignite.internal.metastorage.client.CursorImpl has potential deadlock inside
-        resolvePeer(peer)
-                .thenCompose(node -> cluster.messagingService().invoke(node, request, rpcTimeout))
-                .whenCompleteAsync((resp, err) -> {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("sendWithRetry resp={} from={} to={} err={}",
-                                S.toString(resp),
-                                cluster.topologyService().localMember().address(),
-                                peer.consistentId(),
-                                err == null ? null : err.getMessage());
-                    }
+        try {
+            if (currentTimeMillis() >= stopTime) {
+                fut.completeExceptionally(new TimeoutException());
 
-                    if (err != null) {
-                        handleThrowable(err, peer, request, requestFactory, stopTime, fut);
-                    } else if (resp instanceof ErrorResponse) {
-                        handleErrorResponse((ErrorResponse) resp, peer, request, requestFactory, stopTime, fut);
-                    } else if (resp instanceof SMErrorResponse) {
-                        handleSmErrorResponse((SMErrorResponse) resp, fut);
-                    } else {
-                        leader = peer; // The OK response was received from a leader.
+                return;
+            }
 
-                        fut.complete((R) resp);
-                    }
-                });
+            NetworkMessage request = requestFactory.apply(peer);
+
+            //TODO: IGNITE-15389 org.apache.ignite.internal.metastorage.client.CursorImpl has potential deadlock inside
+            resolvePeer(peer)
+                    .thenCompose(node -> cluster.messagingService().invoke(node, request, rpcTimeout))
+                    .whenCompleteAsync((resp, err) -> {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("sendWithRetry resp={} from={} to={} err={}",
+                                    S.toString(resp),
+                                    cluster.topologyService().localMember().address(),
+                                    peer.consistentId(),
+                                    err == null ? null : err.getMessage());
+                        }
+
+                        if (err != null) {
+                            handleThrowable(err, peer, request, requestFactory, stopTime, fut);
+                        } else if (resp instanceof ErrorResponse) {
+                            handleErrorResponse((ErrorResponse) resp, peer, request, requestFactory, stopTime, fut);
+                        } else if (resp instanceof SMErrorResponse) {
+                            handleSmErrorResponse((SMErrorResponse) resp, fut);
+                        } else {
+                            leader = peer; // The OK response was received from a leader.
+
+                            fut.complete((R) resp);
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     private void handleThrowable(
@@ -550,7 +565,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
             CompletableFuture<? extends NetworkMessage> fut
     ) {
         if (recoverable(err)) {
-            LOG.warn(
+            LOG.trace(
                     "Recoverable error during the request type={} occurred (will be retried on the randomly selected node): ",
                     err, sentRequest.getClass().getSimpleName()
             );
@@ -679,6 +694,14 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
         int lastPeerIndex = excludedPeer == null ? -1 : peers0.indexOf(excludedPeer);
 
+        if (peers0.size() == 1) {
+            if (lastPeerIndex != -1) {
+                return excludedPeer;
+            } else {
+                return peers0.get(0);
+            }
+        }
+
         ThreadLocalRandom random = current();
 
         int newIdx = 0;