You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2023/11/09 09:00:44 UTC
(ignite-3) branch main updated: IGNITE-20692 Add local replica events (#2813)
This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9ac43bdb65 IGNITE-20692 Add local replica events (#2813)
9ac43bdb65 is described below
commit 9ac43bdb65b5a79aa8bb9fd62bef3c510d77dc0a
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Thu Nov 9 11:00:39 2023 +0200
IGNITE-20692 Add local replica events (#2813)
---
.../catalog/events/CatalogEventParameters.java | 4 +-
...tParameters.java => CausalEventParameters.java} | 6 +-
.../ignite/internal/event/EventParameters.java | 21 +-
.../ignite/internal/event/EventProducerTest.java | 2 +-
.../event/PrimaryReplicaEventParameters.java | 4 +-
.../internal/replicator/LocalReplicaEvent.java} | 27 +--
.../replicator/LocalReplicaEventParameters.java} | 42 ++--
.../ignite/internal/replicator/ReplicaManager.java | 221 ++++++++++++---------
.../internal/replicator/ReplicaManagerTest.java | 150 ++++++++++++++
.../apache/ignite/distributed/ItTxTestCluster.java | 34 ++--
10 files changed, 343 insertions(+), 168 deletions(-)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
index a7f8bcc515..fc71fe1475 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.catalog.events;
-import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.event.CausalEventParameters;
/**
* Base class for Catalog event parameters.
*/
-public abstract class CatalogEventParameters extends EventParameters {
+public abstract class CatalogEventParameters extends CausalEventParameters {
private final int catalogVersion;
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/event/CausalEventParameters.java
similarity index 86%
copy from modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java
copy to modules/core/src/main/java/org/apache/ignite/internal/event/CausalEventParameters.java
index d52f76cc3c..b97e18fbe1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/event/CausalEventParameters.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.event;
-/** Event parameters. This type is passed to the {@link EventListener#notify(EventParameters, Throwable)}. */
-public abstract class EventParameters {
+/** {@link EventParameters} implementation that contains a Causality Token. */
+public abstract class CausalEventParameters implements EventParameters {
private final long causalityToken;
/**
@@ -26,7 +26,7 @@ public abstract class EventParameters {
*
* @param causalityToken Causality token.
*/
- public EventParameters(long causalityToken) {
+ public CausalEventParameters(long causalityToken) {
this.causalityToken = causalityToken;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java
index d52f76cc3c..05641cbc2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/event/EventParameters.java
@@ -18,24 +18,5 @@
package org.apache.ignite.internal.event;
/** Event parameters. This type is passed to the {@link EventListener#notify(EventParameters, Throwable)}. */
-public abstract class EventParameters {
- private final long causalityToken;
-
- /**
- * Constructor.
- *
- * @param causalityToken Causality token.
- */
- public EventParameters(long causalityToken) {
- this.causalityToken = causalityToken;
- }
-
- /**
- * Returns a causality token.
- * The token is required for represent a causality dependency between several events.
- * The earlier the event occurred, the lower the value of the token.
- */
- public long causalityToken() {
- return causalityToken;
- }
+public interface EventParameters {
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java
index d60dc196e6..0c4d504406 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/event/EventProducerTest.java
@@ -120,7 +120,7 @@ public class EventProducerTest {
TEST
}
- private static class TestEventParameters extends EventParameters {
+ private static class TestEventParameters extends CausalEventParameters {
private TestEventParameters(long causalityToken) {
super(causalityToken);
}
diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java
index ff6db744ab..197b774c3c 100644
--- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java
+++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.placementdriver.event;
-import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.event.CausalEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
/** Primary replica event parameters. There are properties which associate with a concrete primary replica. */
-public class PrimaryReplicaEventParameters extends EventParameters {
+public class PrimaryReplicaEventParameters extends CausalEventParameters {
private final ReplicationGroupId groupId;
private final String leaseholder;
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEvent.java
similarity index 56%
copy from modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
copy to modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEvent.java
index a7f8bcc515..fbf891504d 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEvent.java
@@ -15,32 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.catalog.events;
+package org.apache.ignite.internal.replicator;
-import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.event.Event;
/**
- * Base class for Catalog event parameters.
+ * Events produced by {@link ReplicaManager}.
*/
-public abstract class CatalogEventParameters extends EventParameters {
- private final int catalogVersion;
-
+public enum LocalReplicaEvent implements Event {
/**
- * Constructor.
- *
- * @param causalityToken Causality token.
- * @param catalogVersion Catalog version.
+ * Fired after a replica has been started on the local node.
*/
- public CatalogEventParameters(long causalityToken, int catalogVersion) {
- super(causalityToken);
-
- this.catalogVersion = catalogVersion;
- }
+ AFTER_REPLICA_STARTED,
/**
- * Returns catalog version.
+ * Fired before a replica has been stopped on the local node.
*/
- public int catalogVersion() {
- return catalogVersion;
- }
+ BEFORE_REPLICA_STOPPED;
}
diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEventParameters.java
similarity index 56%
copy from modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java
copy to modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEventParameters.java
index ff6db744ab..b7be9f2ab4 100644
--- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/LocalReplicaEventParameters.java
@@ -15,38 +15,46 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.placementdriver.event;
+package org.apache.ignite.internal.replicator;
import org.apache.ignite.internal.event.EventParameters;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-/** Primary replica event parameters. There are properties which associate with a concrete primary replica. */
-public class PrimaryReplicaEventParameters extends EventParameters {
+/**
+ * Parameters of events produced by {@link ReplicaManager}.
+ */
+public class LocalReplicaEventParameters implements EventParameters {
+ /** ID of the created replica. */
private final ReplicationGroupId groupId;
- private final String leaseholder;
-
/**
* Constructor.
*
- * @param causalityToken Causality token.
- * @param groupId Replication group ID.
- * @param leaseholder Leaseholder node consistent ID.
+ * @param groupId Replication Group ID.
*/
- public PrimaryReplicaEventParameters(long causalityToken, ReplicationGroupId groupId, String leaseholder) {
- super(causalityToken);
-
+ public LocalReplicaEventParameters(ReplicationGroupId groupId) {
this.groupId = groupId;
- this.leaseholder = leaseholder;
}
- /** Replication group ID. */
public ReplicationGroupId groupId() {
return groupId;
}
- /** Returns leaseholder node consistent ID. */
- public String leaseholder() {
- return leaseholder;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ LocalReplicaEventParameters that = (LocalReplicaEventParameters) o;
+
+ return groupId.equals(that.groupId);
+ }
+
+ @Override
+ public int hashCode() {
+ return groupId.hashCode();
}
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index b3677517ea..ae66627b35 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.replicator;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
+import static org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.io.IOException;
@@ -40,6 +43,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -66,9 +70,7 @@ import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ChannelType;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -79,11 +81,12 @@ import org.jetbrains.annotations.TestOnly;
/**
* Replica manager maintains {@link Replica} instances on an Ignite node.
- * Manager allows starting, stopping, getting a {@link Replica} by its unique id.
- * Only a single instance of the class exists in Ignite node.
- * This class allow to start/stop/get a replica.
+ *
+ * <p>Manager allows starting, stopping, getting a {@link Replica} by its unique id.
+ *
+ * <p>Only a single instance of the class exists in Ignite node.
*/
-public class ReplicaManager implements IgniteComponent {
+public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, LocalReplicaEventParameters> implements IgniteComponent {
/** Default Idle safe time propagation period for tests. */
public static final int DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS = 1000;
@@ -99,9 +102,9 @@ public class ReplicaManager implements IgniteComponent {
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
/** Prevents double stopping of the component. */
- private final AtomicBoolean metaStorageNodes = new AtomicBoolean();
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
- /** Meta storage service. */
+ /** Meta storage node names. */
private final CompletableFuture<Set<String>> msNodes = new CompletableFuture<>();
/** Cluster network service. */
@@ -220,10 +223,16 @@ public class ReplicaManager implements IgniteComponent {
return;
}
+ assert correlationId != null;
+
ReplicaRequest request = (ReplicaRequest) message;
if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Failed to process replica request (the node is stopping) [request={}].", request);
+ }
+
+ return;
}
try {
@@ -236,24 +245,21 @@ public class ReplicaManager implements IgniteComponent {
if (!replicaFut.isDone()) {
replicaFut.whenComplete((createdReplica, ex) -> {
- if (ex != null) {
- clusterNetSvc.messagingService().respond(
- senderConsistentId,
- REPLICA_MESSAGES_FACTORY
- .errorReplicaResponse()
- .throwable(ex)
- .build(),
- correlationId);
- } else {
- createdReplica.ready().thenAccept(unused ->
- IgniteUtils.inBusyLock(
- busyLock,
- () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)
- )
- );
- }
- }
- );
+ if (ex != null) {
+ clusterNetSvc.messagingService().respond(
+ senderConsistentId,
+ REPLICA_MESSAGES_FACTORY
+ .errorReplicaResponse()
+ .throwable(ex)
+ .build(),
+ correlationId);
+ } else {
+ createdReplica.ready().thenRun(() -> inBusyLock(
+ busyLock,
+ () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)
+ ));
+ }
+ });
} else {
sendAwaitReplicaResponse(senderConsistentId, correlationId);
}
@@ -287,16 +293,16 @@ public class ReplicaManager implements IgniteComponent {
CompletableFuture<ReplicaResult> resFut = replica.processRequest(request, senderId);
- resFut.handle((res, ex) -> {
+ resFut.whenComplete((res, ex) -> {
NetworkMessage msg;
if (ex == null) {
msg = prepareReplicaResponse(sendTimestamp, res.result());
} else {
if (indicatesUnexpectedProblem(ex)) {
- LOG.warn("Failed to process replica request [request={}]", ex, request);
+ LOG.warn("Failed to process replica request [request={}].", ex, request);
} else {
- LOG.debug("Failed to process replica request [request={}]", ex, request);
+ LOG.debug("Failed to process replica request [request={}].", ex, request);
}
msg = prepareReplicaErrorResponse(sendTimestamp, ex);
@@ -317,27 +323,23 @@ public class ReplicaManager implements IgniteComponent {
if (ex == null && res.replicationFuture() != null) {
assert request instanceof PrimaryReplicaRequest;
- res.replicationFuture().handle((res0, ex0) -> {
+ res.replicationFuture().whenComplete((res0, ex0) -> {
NetworkMessage msg0;
LOG.debug("Sending delayed response for replica request [request={}]", request);
- if (ex == null) {
+ if (ex0 == null) {
msg0 = prepareReplicaResponse(sendTimestamp, res0);
} else {
- LOG.warn("Failed to process delayed response [request={}]", ex, request);
+ LOG.warn("Failed to process delayed response [request={}]", ex0, request);
- msg0 = prepareReplicaErrorResponse(sendTimestamp, ex);
+ msg0 = prepareReplicaErrorResponse(sendTimestamp, ex0);
}
// Using strong send here is important to avoid a reordering with a normal response.
clusterNetSvc.messagingService().send(senderConsistentId, ChannelType.DEFAULT, msg0);
-
- return null;
});
}
-
- return null;
});
} finally {
busyLock.leaveBusy();
@@ -354,7 +356,7 @@ public class ReplicaManager implements IgniteComponent {
* @param ex An exception
* @return True if this exception has thrown due to timeout or connection problem, false otherwise.
*/
- private static boolean isConnectivityRelatedException(Throwable ex) {
+ private static boolean isConnectivityRelatedException(@Nullable Throwable ex) {
if (ex instanceof ExecutionException || ex instanceof CompletionException) {
ex = ex.getCause();
}
@@ -367,10 +369,16 @@ public class ReplicaManager implements IgniteComponent {
return;
}
+ assert correlationId != null;
+
var msg = (PlacementDriverReplicaMessage) msg0;
if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Failed to process placement driver message (the node is stopping) [msg={}].", msg);
+ }
+
+ return;
}
try {
@@ -382,7 +390,7 @@ public class ReplicaManager implements IgniteComponent {
if (ex == null) {
clusterNetSvc.messagingService().respond(senderConsistentId, response, correlationId);
} else if (!(unwrapCause(ex) instanceof NodeStoppingException)) {
- LOG.error("Failed to process placement driver message [msg={}]", ex, msg);
+ LOG.error("Failed to process placement driver message [msg={}].", ex, msg);
}
});
} finally {
@@ -396,7 +404,7 @@ public class ReplicaManager implements IgniteComponent {
* @param groupId Replication group id.
* @param redirectNodeId Node consistent id to redirect.
*/
- private void stopLeaseProlongation(ReplicationGroupId groupId, String redirectNodeId) {
+ private void stopLeaseProlongation(ReplicationGroupId groupId, @Nullable String redirectNodeId) {
LOG.info("The replica does not meet the requirements for the leaseholder [groupId={}, redirectNodeId={}]", groupId, redirectNodeId);
msNodes.thenAccept(nodeIds -> {
@@ -423,9 +431,10 @@ public class ReplicaManager implements IgniteComponent {
* @param raftClient Topology aware Raft client.
* @param storageIndexTracker Storage index tracker.
* @throws NodeStoppingException If node is stopping.
- * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with the same replication group id has already been started.
+ * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with the same replication group id has already been
+ * started.
*/
- public void startReplica(
+ public CompletableFuture<Replica> startReplica(
ReplicationGroupId replicaGrpId,
CompletableFuture<Void> whenReplicaReady,
ReplicaListener listener,
@@ -437,7 +446,7 @@ public class ReplicaManager implements IgniteComponent {
}
try {
- startReplicaInternal(replicaGrpId, whenReplicaReady, listener, raftClient, storageIndexTracker);
+ return startReplicaInternal(replicaGrpId, whenReplicaReady, listener, raftClient, storageIndexTracker);
} finally {
busyLock.leaveBusy();
}
@@ -452,7 +461,7 @@ public class ReplicaManager implements IgniteComponent {
* @param raftClient Topology aware Raft client.
* @param storageIndexTracker Storage index tracker.
*/
- private void startReplicaInternal(
+ private CompletableFuture<Replica> startReplicaInternal(
ReplicationGroupId replicaGrpId,
CompletableFuture<Void> whenReplicaReady,
ReplicaListener listener,
@@ -472,19 +481,31 @@ public class ReplicaManager implements IgniteComponent {
placementDriver
);
- replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
- if (replicaFut == null) {
+ CompletableFuture<Replica> replicaFuture = replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
+ if (existingReplicaFuture == null || existingReplicaFuture.isDone()) {
+ assert existingReplicaFuture == null || isCompletedSuccessfully(existingReplicaFuture);
+
return completedFuture(newReplica);
} else {
- if (replicaFut.isDone() && !replicaFut.isCancelled() && !replicaFut.isCompletedExceptionally()) {
- return completedFuture(newReplica);
- }
-
- replicaFut.complete(newReplica);
+ existingReplicaFuture.complete(newReplica);
- return replicaFut;
+ return existingReplicaFuture;
}
});
+
+ var eventParams = new LocalReplicaEventParameters(replicaGrpId);
+
+ return fireEvent(AFTER_REPLICA_STARTED, eventParams)
+ .exceptionally(e -> {
+ LOG.error("Error when notifying about AFTER_REPLICA_STARTED event.", e);
+
+ return null;
+ })
+ .thenCompose(v -> replicaFuture);
+ }
+
+ private static boolean isCompletedSuccessfully(CompletableFuture<?> future) {
+ return future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally();
}
/**
@@ -513,34 +534,53 @@ public class ReplicaManager implements IgniteComponent {
* @return True if the replica is found and closed, false otherwise.
*/
private CompletableFuture<Boolean> stopReplicaInternal(ReplicationGroupId replicaGrpId) {
- CompletableFuture<Replica> removed = replicas.remove(replicaGrpId);
-
- if (removed != null) {
- if (!removed.isDone()) {
- removed.completeExceptionally(new ReplicaStoppingException(
- replicaGrpId,
- clusterNetSvc.topologyService().localMember()
- ));
+ var isRemovedFuture = new CompletableFuture<Boolean>();
+
+ var eventParams = new LocalReplicaEventParameters(replicaGrpId);
+
+ fireEvent(BEFORE_REPLICA_STOPPED, eventParams).whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Error when notifying about BEFORE_REPLICA_STOPPED event.", e);
}
- if (!removed.isCompletedExceptionally()) {
- return removed
- .thenCompose(Replica::shutdown)
- .handle((notUsed, throwable) -> {
- if (throwable == null) {
- return true;
- } else {
- LOG.error("Failed to stop replica [replicaGrpId={}]", throwable, replicaGrpId);
+ if (!busyLock.enterBusy()) {
+ isRemovedFuture.completeExceptionally(new NodeStoppingException());
- return false;
- }
- });
+ return;
}
- return completedFuture(true);
- }
+ try {
+ replicas.compute(replicaGrpId, (grpId, replicaFuture) -> {
+ if (replicaFuture == null) {
+ isRemovedFuture.complete(false);
+ } else if (!replicaFuture.isDone()) {
+ ClusterNode localMember = clusterNetSvc.topologyService().localMember();
+
+ replicaFuture.completeExceptionally(new ReplicaStoppingException(grpId, localMember));
+
+ isRemovedFuture.complete(true);
+ } else if (!isCompletedSuccessfully(replicaFuture)) {
+ isRemovedFuture.complete(true);
+ } else {
+ replicaFuture
+ .thenCompose(Replica::shutdown)
+ .whenComplete((notUsed, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Failed to stop replica [replicaGrpId={}].", throwable, grpId);
+ }
+
+ isRemovedFuture.complete(throwable == null);
+ });
+ }
+
+ return null;
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
- return completedFuture(false);
+ return isRemovedFuture;
}
/** {@inheritDoc} */
@@ -557,13 +597,12 @@ public class ReplicaManager implements IgniteComponent {
);
cmgMgr.metaStorageNodes().whenComplete((nodes, e) -> {
- if (e != null) {
- msNodes.completeExceptionally(e);
- } else {
- msNodes.complete(nodes);
- }
- }
- );
+ if (e != null) {
+ msNodes.completeExceptionally(e);
+ } else {
+ msNodes.complete(nodes);
+ }
+ });
localNodeId = clusterNetSvc.topologyService().localMember().id();
}
@@ -571,7 +610,7 @@ public class ReplicaManager implements IgniteComponent {
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
- if (!metaStorageNodes.compareAndSet(false, true)) {
+ if (!stopGuard.compareAndSet(false, true)) {
return;
}
@@ -582,12 +621,10 @@ public class ReplicaManager implements IgniteComponent {
assert replicas.values().stream().noneMatch(CompletableFuture::isDone)
: "There are replicas alive [replicas="
- + replicas.entrySet().stream().filter(e -> e.getValue().isDone()).map(Entry::getKey).collect(toSet()) + ']';
+ + replicas.entrySet().stream().filter(e -> e.getValue().isDone()).map(Entry::getKey).collect(toSet()) + ']';
for (CompletableFuture<Replica> replicaFuture : replicas.values()) {
- if (!replicaFuture.isDone()) {
- replicaFuture.completeExceptionally(new NodeStoppingException());
- }
+ replicaFuture.completeExceptionally(new NodeStoppingException());
}
}
@@ -607,7 +644,7 @@ public class ReplicaManager implements IgniteComponent {
*/
private void sendReplicaUnavailableErrorResponse(
String senderConsistentId,
- @Nullable Long correlationId,
+ long correlationId,
ReplicationGroupId groupId,
@Nullable HybridTimestamp requestTimestamp
) {
@@ -631,8 +668,8 @@ public class ReplicaManager implements IgniteComponent {
.errorReplicaResponse()
.throwable(
new ReplicaUnavailableException(
- groupId,
- clusterNetSvc.topologyService().localMember())
+ groupId,
+ clusterNetSvc.topologyService().localMember())
)
.build(),
correlationId);
@@ -642,7 +679,7 @@ public class ReplicaManager implements IgniteComponent {
/**
* Sends await replica response.
*/
- private void sendAwaitReplicaResponse(String senderConsistentId, @Nullable Long correlationId) {
+ private void sendAwaitReplicaResponse(String senderConsistentId, long correlationId) {
clusterNetSvc.messagingService().respond(
senderConsistentId,
REPLICA_MESSAGES_FACTORY
diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
new file mode 100644
index 0000000000..d149b7a55b
--- /dev/null
+++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
+import static org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link ReplicaManager}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class ReplicaManagerTest extends BaseIgniteAbstractTest {
+ private ReplicaManager replicaManager;
+
+ @BeforeEach
+ void startReplicaManager(
+ TestInfo testInfo,
+ @Mock ClusterService clusterService,
+ @Mock ClusterManagementGroupManager cmgManager,
+ @Mock PlacementDriver placementDriver,
+ @Mock MessagingService messagingService,
+ @Mock TopologyService topologyService
+ ) {
+ String nodeName = testNodeName(testInfo, 0);
+
+ when(clusterService.messagingService()).thenReturn(messagingService);
+ when(clusterService.topologyService()).thenReturn(topologyService);
+
+ when(topologyService.localMember()).thenReturn(new ClusterNodeImpl(nodeName, nodeName, new NetworkAddress("foo", 0)));
+
+ when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of()));
+
+ var clock = new HybridClockImpl();
+
+ replicaManager = new ReplicaManager(nodeName, clusterService, cmgManager, clock, Set.of(), placementDriver);
+
+ replicaManager.start();
+ }
+
+ @AfterEach
+ void stopReplicaManager() throws Exception {
+ CompletableFuture<?>[] replicaStopFutures = replicaManager.startedGroups().stream()
+ .map(id -> {
+ try {
+ return replicaManager.stopReplica(id);
+ } catch (NodeStoppingException e) {
+ throw new AssertionError(e);
+ }
+ })
+ .toArray(CompletableFuture[]::new);
+
+ assertThat(allOf(replicaStopFutures), willCompleteSuccessfully());
+
+ replicaManager.stop();
+ }
+
+ /**
+ * Tests that Replica Manager produces events when a Replica is started or stopped.
+ */
+ @Test
+ void testReplicaEvents(
+ @Mock EventListener<LocalReplicaEventParameters> createReplicaListener,
+ @Mock EventListener<LocalReplicaEventParameters> removeReplicaListener,
+ @Mock ReplicaListener replicaListener,
+ @Mock TopologyAwareRaftGroupService raftGroupService
+ ) throws NodeStoppingException {
+ when(raftGroupService.unsubscribeLeader()).thenReturn(completedFuture(null));
+
+ when(createReplicaListener.notify(any(), any())).thenReturn(completedFuture(false));
+ when(removeReplicaListener.notify(any(), any())).thenReturn(completedFuture(false));
+
+ replicaManager.listen(AFTER_REPLICA_STARTED, createReplicaListener);
+ replicaManager.listen(BEFORE_REPLICA_STOPPED, removeReplicaListener);
+
+ var groupId = new TablePartitionId(0, 0);
+
+ CompletableFuture<Replica> startReplicaFuture = replicaManager.startReplica(
+ groupId,
+ completedFuture(null),
+ replicaListener,
+ raftGroupService,
+ new PendingComparableValuesTracker<>(0L)
+ );
+
+ assertThat(startReplicaFuture, willCompleteSuccessfully());
+
+ var expectedCreateParams = new LocalReplicaEventParameters(groupId);
+
+ verify(createReplicaListener).notify(eq(expectedCreateParams), isNull());
+ verify(removeReplicaListener, never()).notify(any(), any());
+
+ CompletableFuture<Boolean> stopReplicaFuture = replicaManager.stopReplica(groupId);
+
+ assertThat(stopReplicaFuture, willBe(true));
+
+ verify(createReplicaListener).notify(eq(expectedCreateParams), isNull());
+ verify(removeReplicaListener).notify(eq(expectedCreateParams), isNull());
+ }
+}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 2fcabab70e..84ef75e717 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -17,13 +17,16 @@
package org.apache.ignite.distributed;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.apache.ignite.utils.ClusterServiceTestUtils.waitForTopology;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -80,7 +83,6 @@ import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.ColumnsExtractor;
@@ -726,11 +728,7 @@ public class ItTxTestCluster {
* @throws Exception If failed.
*/
public void shutdownCluster() throws Exception {
- cluster.parallelStream().map(c -> {
- c.stop();
- return null;
- }).forEach(o -> {
- });
+ cluster.parallelStream().forEach(ClusterService::stop);
if (client != null) {
client.stop();
@@ -746,13 +744,25 @@ public class ItTxTestCluster {
ReplicaManager replicaMgr = replicaManagers.get(entry.getKey());
- for (ReplicationGroupId grp : replicaMgr.startedGroups()) {
- replicaMgr.stopReplica(grp).join();
- }
+ CompletableFuture<?>[] replicaStopFutures = replicaMgr.startedGroups().stream()
+ .map(grp -> {
+ try {
+ return replicaMgr.stopReplica(grp);
+ } catch (NodeStoppingException e) {
+ throw new AssertionError(e);
+ }
+ })
+ .toArray(CompletableFuture[]::new);
- for (RaftNodeId nodeId : rs.localNodes()) {
- rs.stopRaftNode(nodeId);
- }
+ assertThat(allOf(replicaStopFutures), willCompleteSuccessfully());
+
+ rs.localNodes().parallelStream().forEach(nodeId -> {
+ try {
+ rs.stopRaftNode(nodeId);
+ } catch (NodeStoppingException e) {
+ throw new AssertionError(e);
+ }
+ });
replicaMgr.stop();
rs.stop();