You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2023/11/09 09:00:44 UTC

(ignite-3) branch main updated: IGNITE-20692 Add local replica events (#2813)

This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ac43bdb65 IGNITE-20692 Add local replica events (#2813)
9ac43bdb65 is described below

commit 9ac43bdb65b5a79aa8bb9fd62bef3c510d77dc0a
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Thu Nov 9 11:00:39 2023 +0200

    IGNITE-20692 Add local replica events (#2813)
---
 .../catalog/events/CatalogEventParameters.java     |   4 +-
 ...tParameters.java => CausalEventParameters.java} |   6 +-
 .../ignite/internal/event/EventParameters.java     |  21 +-
 .../ignite/internal/event/EventProducerTest.java   |   2 +-
 .../event/PrimaryReplicaEventParameters.java       |   4 +-
 .../internal/replicator/LocalReplicaEvent.java}    |  27 +--
 .../replicator/LocalReplicaEventParameters.java}   |  42 ++--
 .../ignite/internal/replicator/ReplicaManager.java | 221 ++++++++++++---------
 .../internal/replicator/ReplicaManagerTest.java    | 150 ++++++++++++++
 .../apache/ignite/distributed/ItTxTestCluster.java |  34 ++--
 10 files changed, 343 insertions(+), 168 deletions(-)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
index a7f8bcc515..fc71fe1475 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.catalog.events;
 
-import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.event.CausalEventParameters;
 
 /**
  * Base class for Catalog event parameters.
  */
-public abstract class CatalogEventParameters extends EventParameters {
+public abstract class CatalogEventParameters extends CausalEventParameters {
     private final int catalogVersion;
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/event/CausalEventParameters.java
similarity index 86%
copy from modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java
copy to modules/core/src/main/java/org/apache/ignite/internal/event/CausalEventParameters.java
index d52f76cc3c..b97e18fbe1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/event/CausalEventParameters.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.event;
 
-/** Event parameters. This type is passed to the {@link EventListener#notify(EventParameters, Throwable)}. */
-public abstract class EventParameters {
+/** {@link EventParameters} implementation that contains a Causality Token. */
+public abstract class CausalEventParameters implements EventParameters {
     private final long causalityToken;
 
     /**
@@ -26,7 +26,7 @@ public abstract class EventParameters {
      *
      * @param causalityToken Causality token.
      */
-    public EventParameters(long causalityToken) {
+    public CausalEventParameters(long causalityToken) {
         this.causalityToken = causalityToken;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java
index d52f76cc3c..05641cbc2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java
@@ -18,24 +18,5 @@
 package org.apache.ignite.internal.event;
 
 /** Event parameters. This type is passed to the {@link EventListener#notify(EventParameters, Throwable)}. */
-public abstract class EventParameters {
-    private final long causalityToken;
-
-    /**
-     * Constructor.
-     *
-     * @param causalityToken Causality token.
-     */
-    public EventParameters(long causalityToken) {
-        this.causalityToken = causalityToken;
-    }
-
-    /**
-     * Returns a causality token.
-     * The token is required for represent a causality dependency between several events.
-     * The earlier the event occurred, the lower the value of the token.
-     */
-    public long causalityToken() {
-        return causalityToken;
-    }
+public interface EventParameters {
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java
index d60dc196e6..0c4d504406 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java
@@ -120,7 +120,7 @@ public class EventProducerTest {
         TEST
     }
 
-    private static class TestEventParameters extends EventParameters {
+    private static class TestEventParameters extends CausalEventParameters {
         private TestEventParameters(long causalityToken) {
             super(causalityToken);
         }
diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java
index ff6db744ab..197b774c3c 100644
--- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java
+++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.internal.placementdriver.event;
 
-import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.event.CausalEventParameters;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 
 /** Primary replica event parameters. There are properties which associate with a concrete primary replica. */
-public class PrimaryReplicaEventParameters extends EventParameters {
+public class PrimaryReplicaEventParameters extends CausalEventParameters {
     private final ReplicationGroupId groupId;
 
     private final String leaseholder;
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEvent.java
similarity index 56%
copy from modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
copy to modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEvent.java
index a7f8bcc515..fbf891504d 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEvent.java
@@ -15,32 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.catalog.events;
+package org.apache.ignite.internal.replicator;
 
-import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.event.Event;
 
 /**
- * Base class for Catalog event parameters.
+ * Events produced by {@link ReplicaManager}.
  */
-public abstract class CatalogEventParameters extends EventParameters {
-    private final int catalogVersion;
-
+public enum LocalReplicaEvent implements Event {
     /**
-     * Constructor.
-     *
-     * @param causalityToken Causality token.
-     * @param catalogVersion Catalog version.
+     * Fired after a replica has been started on the local node.
      */
-    public CatalogEventParameters(long causalityToken, int catalogVersion) {
-        super(causalityToken);
-
-        this.catalogVersion = catalogVersion;
-    }
+    AFTER_REPLICA_STARTED,
 
     /**
-     * Returns catalog version.
+     * Fired before a replica has been stopped on the local node.
      */
-    public int catalogVersion() {
-        return catalogVersion;
-    }
+    BEFORE_REPLICA_STOPPED;
 }
diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEventParameters.java
similarity index 56%
copy from modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java
copy to modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEventParameters.java
index ff6db744ab..b7be9f2ab4 100644
--- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEventParameters.java
@@ -15,38 +15,46 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.placementdriver.event;
+package org.apache.ignite.internal.replicator;
 
 import org.apache.ignite.internal.event.EventParameters;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 
-/** Primary replica event parameters. There are properties which associate with a concrete primary replica. */
-public class PrimaryReplicaEventParameters extends EventParameters {
+/**
+ * Parameters of events produced by {@link ReplicaManager}.
+ */
+public class LocalReplicaEventParameters implements EventParameters {
+    /** ID of the created replica. */
     private final ReplicationGroupId groupId;
 
-    private final String leaseholder;
-
     /**
      * Constructor.
      *
-     * @param causalityToken Causality token.
-     * @param groupId Replication group ID.
-     * @param leaseholder Leaseholder node consistent ID.
+     * @param groupId Replication Group ID.
      */
-    public PrimaryReplicaEventParameters(long causalityToken, ReplicationGroupId groupId, String leaseholder) {
-        super(causalityToken);
-
+    public LocalReplicaEventParameters(ReplicationGroupId groupId) {
         this.groupId = groupId;
-        this.leaseholder = leaseholder;
     }
 
-    /** Replication group ID. */
     public ReplicationGroupId groupId() {
         return groupId;
     }
 
-    /** Returns leaseholder node consistent ID. */
-    public String leaseholder() {
-        return leaseholder;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        LocalReplicaEventParameters that = (LocalReplicaEventParameters) o;
+
+        return groupId.equals(that.groupId);
+    }
+
+    @Override
+    public int hashCode() {
+        return groupId.hashCode();
     }
 }
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index b3677517ea..ae66627b35 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.replicator;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
+import static org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import java.io.IOException;
@@ -40,6 +43,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongSupplier;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -66,9 +70,7 @@ import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ChannelType;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -79,11 +81,12 @@ import org.jetbrains.annotations.TestOnly;
 
 /**
  * Replica manager maintains {@link Replica} instances on an Ignite node.
- * Manager allows starting, stopping, getting a {@link Replica} by its unique id.
- * Only a single instance of the class exists in Ignite node.
- * This class allow to start/stop/get a replica.
+ *
+ * <p>Manager allows starting, stopping, getting a {@link Replica} by its unique id.
+ *
+ * <p>Only a single instance of the class exists in Ignite node.
  */
-public class ReplicaManager implements IgniteComponent {
+public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, LocalReplicaEventParameters> implements IgniteComponent {
     /** Default Idle safe time propagation period for tests. */
     public static final int DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS = 1000;
 
@@ -99,9 +102,9 @@ public class ReplicaManager implements IgniteComponent {
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
     /** Prevents double stopping of the component. */
-    private final AtomicBoolean metaStorageNodes = new AtomicBoolean();
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
 
-    /** Meta storage service. */
+    /** Meta storage node names. */
     private final CompletableFuture<Set<String>> msNodes = new CompletableFuture<>();
 
     /** Cluster network service. */
@@ -220,10 +223,16 @@ public class ReplicaManager implements IgniteComponent {
             return;
         }
 
+        assert correlationId != null;
+
         ReplicaRequest request = (ReplicaRequest) message;
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Failed to process replica request (the node is stopping) [request={}].", request);
+            }
+
+            return;
         }
 
         try {
@@ -236,24 +245,21 @@ public class ReplicaManager implements IgniteComponent {
 
                     if (!replicaFut.isDone()) {
                         replicaFut.whenComplete((createdReplica, ex) -> {
-                                    if (ex != null) {
-                                        clusterNetSvc.messagingService().respond(
-                                                senderConsistentId,
-                                                REPLICA_MESSAGES_FACTORY
-                                                        .errorReplicaResponse()
-                                                        .throwable(ex)
-                                                        .build(),
-                                                correlationId);
-                                    } else {
-                                        createdReplica.ready().thenAccept(unused ->
-                                                IgniteUtils.inBusyLock(
-                                                        busyLock,
-                                                        () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)
-                                                )
-                                        );
-                                    }
-                                }
-                        );
+                            if (ex != null) {
+                                clusterNetSvc.messagingService().respond(
+                                        senderConsistentId,
+                                        REPLICA_MESSAGES_FACTORY
+                                                .errorReplicaResponse()
+                                                .throwable(ex)
+                                                .build(),
+                                        correlationId);
+                            } else {
+                                createdReplica.ready().thenRun(() -> inBusyLock(
+                                        busyLock,
+                                        () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)
+                                ));
+                            }
+                        });
                     } else {
                         sendAwaitReplicaResponse(senderConsistentId, correlationId);
                     }
@@ -287,16 +293,16 @@ public class ReplicaManager implements IgniteComponent {
 
             CompletableFuture<ReplicaResult> resFut = replica.processRequest(request, senderId);
 
-            resFut.handle((res, ex) -> {
+            resFut.whenComplete((res, ex) -> {
                 NetworkMessage msg;
 
                 if (ex == null) {
                     msg = prepareReplicaResponse(sendTimestamp, res.result());
                 } else {
                     if (indicatesUnexpectedProblem(ex)) {
-                        LOG.warn("Failed to process replica request [request={}]", ex, request);
+                        LOG.warn("Failed to process replica request [request={}].", ex, request);
                     } else {
-                        LOG.debug("Failed to process replica request [request={}]", ex, request);
+                        LOG.debug("Failed to process replica request [request={}].", ex, request);
                     }
 
                     msg = prepareReplicaErrorResponse(sendTimestamp, ex);
@@ -317,27 +323,23 @@ public class ReplicaManager implements IgniteComponent {
                 if (ex == null && res.replicationFuture() != null) {
                     assert request instanceof PrimaryReplicaRequest;
 
-                    res.replicationFuture().handle((res0, ex0) -> {
+                    res.replicationFuture().whenComplete((res0, ex0) -> {
                         NetworkMessage msg0;
 
                         LOG.debug("Sending delayed response for replica request [request={}]", request);
 
-                        if (ex == null) {
+                        if (ex0 == null) {
                             msg0 = prepareReplicaResponse(sendTimestamp, res0);
                         } else {
-                            LOG.warn("Failed to process delayed response [request={}]", ex, request);
+                            LOG.warn("Failed to process delayed response [request={}]", ex0, request);
 
-                            msg0 = prepareReplicaErrorResponse(sendTimestamp, ex);
+                            msg0 = prepareReplicaErrorResponse(sendTimestamp, ex0);
                         }
 
                         // Using strong send here is important to avoid a reordering with a normal response.
                         clusterNetSvc.messagingService().send(senderConsistentId, ChannelType.DEFAULT, msg0);
-
-                        return null;
                     });
                 }
-
-                return null;
             });
         } finally {
             busyLock.leaveBusy();
@@ -354,7 +356,7 @@ public class ReplicaManager implements IgniteComponent {
      * @param ex An exception
      * @return True if this exception has thrown due to timeout or connection problem, false otherwise.
      */
-    private static boolean isConnectivityRelatedException(Throwable ex) {
+    private static boolean isConnectivityRelatedException(@Nullable Throwable ex) {
         if (ex instanceof ExecutionException || ex instanceof CompletionException) {
             ex = ex.getCause();
         }
@@ -367,10 +369,16 @@ public class ReplicaManager implements IgniteComponent {
             return;
         }
 
+        assert correlationId != null;
+
         var msg = (PlacementDriverReplicaMessage) msg0;
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Failed to process placement driver message (the node is stopping) [msg={}].", msg);
+            }
+
+            return;
         }
 
         try {
@@ -382,7 +390,7 @@ public class ReplicaManager implements IgniteComponent {
                         if (ex == null) {
                             clusterNetSvc.messagingService().respond(senderConsistentId, response, correlationId);
                         } else if (!(unwrapCause(ex) instanceof NodeStoppingException)) {
-                            LOG.error("Failed to process placement driver message [msg={}]", ex, msg);
+                            LOG.error("Failed to process placement driver message [msg={}].", ex, msg);
                         }
                     });
         } finally {
@@ -396,7 +404,7 @@ public class ReplicaManager implements IgniteComponent {
      * @param groupId Replication group id.
      * @param redirectNodeId Node consistent id to redirect.
      */
-    private void stopLeaseProlongation(ReplicationGroupId groupId, String redirectNodeId) {
+    private void stopLeaseProlongation(ReplicationGroupId groupId, @Nullable String redirectNodeId) {
         LOG.info("The replica does not meet the requirements for the leaseholder [groupId={}, redirectNodeId={}]", groupId, redirectNodeId);
 
         msNodes.thenAccept(nodeIds -> {
@@ -423,9 +431,10 @@ public class ReplicaManager implements IgniteComponent {
      * @param raftClient Topology aware Raft client.
      * @param storageIndexTracker Storage index tracker.
      * @throws NodeStoppingException If node is stopping.
-     * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with the same replication group id has already been started.
+     * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with the same replication group id has already been
+     *         started.
      */
-    public void startReplica(
+    public CompletableFuture<Replica> startReplica(
             ReplicationGroupId replicaGrpId,
             CompletableFuture<Void> whenReplicaReady,
             ReplicaListener listener,
@@ -437,7 +446,7 @@ public class ReplicaManager implements IgniteComponent {
         }
 
         try {
-            startReplicaInternal(replicaGrpId, whenReplicaReady, listener, raftClient, storageIndexTracker);
+            return startReplicaInternal(replicaGrpId, whenReplicaReady, listener, raftClient, storageIndexTracker);
         } finally {
             busyLock.leaveBusy();
         }
@@ -452,7 +461,7 @@ public class ReplicaManager implements IgniteComponent {
      * @param raftClient Topology aware Raft client.
      * @param storageIndexTracker Storage index tracker.
      */
-    private void startReplicaInternal(
+    private CompletableFuture<Replica> startReplicaInternal(
             ReplicationGroupId replicaGrpId,
             CompletableFuture<Void> whenReplicaReady,
             ReplicaListener listener,
@@ -472,19 +481,31 @@ public class ReplicaManager implements IgniteComponent {
                 placementDriver
         );
 
-        replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
-            if (replicaFut == null) {
+        CompletableFuture<Replica> replicaFuture = replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
+            if (existingReplicaFuture == null || existingReplicaFuture.isDone()) {
+                assert existingReplicaFuture == null || isCompletedSuccessfully(existingReplicaFuture);
+
                 return completedFuture(newReplica);
             } else {
-                if (replicaFut.isDone() && !replicaFut.isCancelled() && !replicaFut.isCompletedExceptionally()) {
-                    return completedFuture(newReplica);
-                }
-
-                replicaFut.complete(newReplica);
+                existingReplicaFuture.complete(newReplica);
 
-                return replicaFut;
+                return existingReplicaFuture;
             }
         });
+
+        var eventParams = new LocalReplicaEventParameters(replicaGrpId);
+
+        return fireEvent(AFTER_REPLICA_STARTED, eventParams)
+                .exceptionally(e -> {
+                    LOG.error("Error when notifying about AFTER_REPLICA_STARTED event.", e);
+
+                    return null;
+                })
+                .thenCompose(v -> replicaFuture);
+    }
+
+    private static boolean isCompletedSuccessfully(CompletableFuture<?> future) {
+        return future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally();
     }
 
     /**
@@ -513,34 +534,53 @@ public class ReplicaManager implements IgniteComponent {
      * @return True if the replica is found and closed, false otherwise.
      */
     private CompletableFuture<Boolean> stopReplicaInternal(ReplicationGroupId replicaGrpId) {
-        CompletableFuture<Replica> removed = replicas.remove(replicaGrpId);
-
-        if (removed != null) {
-            if (!removed.isDone()) {
-                removed.completeExceptionally(new ReplicaStoppingException(
-                        replicaGrpId,
-                        clusterNetSvc.topologyService().localMember()
-                ));
+        var isRemovedFuture = new CompletableFuture<Boolean>();
+
+        var eventParams = new LocalReplicaEventParameters(replicaGrpId);
+
+        fireEvent(BEFORE_REPLICA_STOPPED, eventParams).whenComplete((v, e) -> {
+            if (e != null) {
+                LOG.error("Error when notifying about BEFORE_REPLICA_STOPPED event.", e);
             }
 
-            if (!removed.isCompletedExceptionally()) {
-                return removed
-                        .thenCompose(Replica::shutdown)
-                        .handle((notUsed, throwable) -> {
-                            if (throwable == null) {
-                                return true;
-                            } else {
-                                LOG.error("Failed to stop replica [replicaGrpId={}]", throwable, replicaGrpId);
+            if (!busyLock.enterBusy()) {
+                isRemovedFuture.completeExceptionally(new NodeStoppingException());
 
-                                return false;
-                            }
-                        });
+                return;
             }
 
-            return completedFuture(true);
-        }
+            try {
+                replicas.compute(replicaGrpId, (grpId, replicaFuture) -> {
+                    if (replicaFuture == null) {
+                        isRemovedFuture.complete(false);
+                    } else if (!replicaFuture.isDone()) {
+                        ClusterNode localMember = clusterNetSvc.topologyService().localMember();
+
+                        replicaFuture.completeExceptionally(new ReplicaStoppingException(grpId, localMember));
+
+                        isRemovedFuture.complete(true);
+                    } else if (!isCompletedSuccessfully(replicaFuture)) {
+                        isRemovedFuture.complete(true);
+                    } else {
+                        replicaFuture
+                                .thenCompose(Replica::shutdown)
+                                .whenComplete((notUsed, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.error("Failed to stop replica [replicaGrpId={}].", throwable, grpId);
+                                    }
+
+                                    isRemovedFuture.complete(throwable == null);
+                                });
+                    }
+
+                    return null;
+                });
+            } finally {
+                busyLock.leaveBusy();
+            }
+        });
 
-        return completedFuture(false);
+        return isRemovedFuture;
     }
 
     /** {@inheritDoc} */
@@ -557,13 +597,12 @@ public class ReplicaManager implements IgniteComponent {
         );
 
         cmgMgr.metaStorageNodes().whenComplete((nodes, e) -> {
-                    if (e != null) {
-                        msNodes.completeExceptionally(e);
-                    } else {
-                        msNodes.complete(nodes);
-                    }
-                }
-        );
+            if (e != null) {
+                msNodes.completeExceptionally(e);
+            } else {
+                msNodes.complete(nodes);
+            }
+        });
 
         localNodeId = clusterNetSvc.topologyService().localMember().id();
     }
@@ -571,7 +610,7 @@ public class ReplicaManager implements IgniteComponent {
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
-        if (!metaStorageNodes.compareAndSet(false, true)) {
+        if (!stopGuard.compareAndSet(false, true)) {
             return;
         }
 
@@ -582,12 +621,10 @@ public class ReplicaManager implements IgniteComponent {
 
         assert replicas.values().stream().noneMatch(CompletableFuture::isDone)
                 : "There are replicas alive [replicas="
-                    + replicas.entrySet().stream().filter(e -> e.getValue().isDone()).map(Entry::getKey).collect(toSet()) + ']';
+                + replicas.entrySet().stream().filter(e -> e.getValue().isDone()).map(Entry::getKey).collect(toSet()) + ']';
 
         for (CompletableFuture<Replica> replicaFuture : replicas.values()) {
-            if (!replicaFuture.isDone()) {
-                replicaFuture.completeExceptionally(new NodeStoppingException());
-            }
+            replicaFuture.completeExceptionally(new NodeStoppingException());
         }
     }
 
@@ -607,7 +644,7 @@ public class ReplicaManager implements IgniteComponent {
      */
     private void sendReplicaUnavailableErrorResponse(
             String senderConsistentId,
-            @Nullable Long correlationId,
+            long correlationId,
             ReplicationGroupId groupId,
             @Nullable HybridTimestamp requestTimestamp
     ) {
@@ -631,8 +668,8 @@ public class ReplicaManager implements IgniteComponent {
                             .errorReplicaResponse()
                             .throwable(
                                     new ReplicaUnavailableException(
-                                        groupId,
-                                        clusterNetSvc.topologyService().localMember())
+                                            groupId,
+                                            clusterNetSvc.topologyService().localMember())
                             )
                             .build(),
                     correlationId);
@@ -642,7 +679,7 @@ public class ReplicaManager implements IgniteComponent {
     /**
      * Sends await replica response.
      */
-    private void sendAwaitReplicaResponse(String senderConsistentId, @Nullable Long correlationId) {
+    private void sendAwaitReplicaResponse(String senderConsistentId, long correlationId) {
         clusterNetSvc.messagingService().respond(
                 senderConsistentId,
                 REPLICA_MESSAGES_FACTORY
diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
new file mode 100644
index 0000000000..d149b7a55b
--- /dev/null
+++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
+import static org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link ReplicaManager}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class ReplicaManagerTest extends BaseIgniteAbstractTest {
+    private ReplicaManager replicaManager;
+
+    @BeforeEach
+    void startReplicaManager(
+            TestInfo testInfo,
+            @Mock ClusterService clusterService,
+            @Mock ClusterManagementGroupManager cmgManager,
+            @Mock PlacementDriver placementDriver,
+            @Mock MessagingService messagingService,
+            @Mock TopologyService topologyService
+    ) {
+        String nodeName = testNodeName(testInfo, 0);
+
+        when(clusterService.messagingService()).thenReturn(messagingService);
+        when(clusterService.topologyService()).thenReturn(topologyService);
+
+        when(topologyService.localMember()).thenReturn(new ClusterNodeImpl(nodeName, nodeName, new NetworkAddress("foo", 0)));
+
+        when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of()));
+
+        var clock = new HybridClockImpl();
+
+        replicaManager = new ReplicaManager(nodeName, clusterService, cmgManager, clock, Set.of(), placementDriver);
+
+        replicaManager.start();
+    }
+
+    @AfterEach
+    void stopReplicaManager() throws Exception {
+        CompletableFuture<?>[] replicaStopFutures = replicaManager.startedGroups().stream()
+            .map(id -> {
+                try {
+                    return replicaManager.stopReplica(id);
+                } catch (NodeStoppingException e) {
+                    throw new AssertionError(e);
+                }
+            })
+            .toArray(CompletableFuture[]::new);
+
+        assertThat(allOf(replicaStopFutures), willCompleteSuccessfully());
+
+        replicaManager.stop();
+    }
+
+    /**
+     * Tests that Replica Manager produces events when a Replica is started or stopped.
+     */
+    @Test
+    void testReplicaEvents(
+            @Mock EventListener<LocalReplicaEventParameters> createReplicaListener,
+            @Mock EventListener<LocalReplicaEventParameters> removeReplicaListener,
+            @Mock ReplicaListener replicaListener,
+            @Mock TopologyAwareRaftGroupService raftGroupService
+    ) throws NodeStoppingException {
+        when(raftGroupService.unsubscribeLeader()).thenReturn(completedFuture(null));
+
+        when(createReplicaListener.notify(any(), any())).thenReturn(completedFuture(false));
+        when(removeReplicaListener.notify(any(), any())).thenReturn(completedFuture(false));
+
+        replicaManager.listen(AFTER_REPLICA_STARTED, createReplicaListener);
+        replicaManager.listen(BEFORE_REPLICA_STOPPED, removeReplicaListener);
+
+        var groupId = new TablePartitionId(0, 0);
+
+        CompletableFuture<Replica> startReplicaFuture = replicaManager.startReplica(
+                groupId,
+                completedFuture(null),
+                replicaListener,
+                raftGroupService,
+                new PendingComparableValuesTracker<>(0L)
+        );
+
+        assertThat(startReplicaFuture, willCompleteSuccessfully());
+
+        var expectedCreateParams = new LocalReplicaEventParameters(groupId);
+
+        verify(createReplicaListener).notify(eq(expectedCreateParams), isNull());
+        verify(removeReplicaListener, never()).notify(any(), any());
+
+        CompletableFuture<Boolean> stopReplicaFuture = replicaManager.stopReplica(groupId);
+
+        assertThat(stopReplicaFuture, willBe(true));
+
+        verify(createReplicaListener).notify(eq(expectedCreateParams), isNull());
+        verify(removeReplicaListener).notify(eq(expectedCreateParams), isNull());
+    }
+}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 2fcabab70e..84ef75e717 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -17,13 +17,16 @@
 
 package org.apache.ignite.distributed;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static org.apache.ignite.utils.ClusterServiceTestUtils.waitForTopology;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -80,7 +83,6 @@ import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.ColumnsExtractor;
@@ -726,11 +728,7 @@ public class ItTxTestCluster {
      * @throws Exception If failed.
      */
     public void shutdownCluster() throws Exception {
-        cluster.parallelStream().map(c -> {
-            c.stop();
-            return null;
-        }).forEach(o -> {
-        });
+        cluster.parallelStream().forEach(ClusterService::stop);
 
         if (client != null) {
             client.stop();
@@ -746,13 +744,25 @@ public class ItTxTestCluster {
 
                 ReplicaManager replicaMgr = replicaManagers.get(entry.getKey());
 
-                for (ReplicationGroupId grp : replicaMgr.startedGroups()) {
-                    replicaMgr.stopReplica(grp).join();
-                }
+                CompletableFuture<?>[] replicaStopFutures = replicaMgr.startedGroups().stream()
+                        .map(grp -> {
+                            try {
+                                return replicaMgr.stopReplica(grp);
+                            } catch (NodeStoppingException e) {
+                                throw new AssertionError(e);
+                            }
+                        })
+                        .toArray(CompletableFuture[]::new);
 
-                for (RaftNodeId nodeId : rs.localNodes()) {
-                    rs.stopRaftNode(nodeId);
-                }
+                assertThat(allOf(replicaStopFutures), willCompleteSuccessfully());
+
+                rs.localNodes().parallelStream().forEach(nodeId -> {
+                    try {
+                        rs.stopRaftNode(nodeId);
+                    } catch (NodeStoppingException e) {
+                        throw new AssertionError(e);
+                    }
+                });
 
                 replicaMgr.stop();
                 rs.stop();