You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2020/05/06 16:53:09 UTC
[ignite] branch master updated: IGNITE-12981: Fix pme-free snapshot
exchange if coordinator left the cluster (#7770)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6ddb17f IGNITE-12981: Fix pme-free snapshot exchange if coordinator left the cluster (#7770)
6ddb17f is described below
commit 6ddb17fb9e346f085d7ef981442e8b62d09ee3fa
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Wed May 6 19:52:48 2020 +0300
IGNITE-12981: Fix pme-free snapshot exchange if coordinator left the cluster (#7770)
---
.../preloader/GridDhtPartitionsExchangeFuture.java | 41 ++++-
.../snapshot/IgniteSnapshotManager.java | 2 +-
.../snapshot/IgniteClusterSnapshotSelfTest.java | 186 +++++++++++++++++++++
3 files changed, 222 insertions(+), 7 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e907c20..a8fc478 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -121,6 +121,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
@@ -800,8 +801,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
ExchangeType exchange;
if (exchCtx.exchangeFreeSwitch()) {
- exchange = isSnapshotOperation(firstDiscoEvt) ? onCustomMessageNoAffinityChange() :
- onExchangeFreeSwitchNodeLeft();
+ if (isSnapshotOperation(firstDiscoEvt)) {
+ // Keep if the cluster was rebalanced.
+ if (wasRebalanced())
+ markRebalanced();
+
+ if (!forceAffReassignment)
+ cctx.affinity().onCustomMessageNoAffinityChange(this, exchActions);
+
+ exchange = cctx.kernalContext().clientNode() ? ExchangeType.NONE : ExchangeType.ALL;
+ }
+ else
+ exchange = onExchangeFreeSwitchNodeLeft();
initCoordinatorCaches(newCrd);
}
@@ -1445,6 +1456,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private void clientOnlyExchange() throws IgniteCheckedException {
if (crd != null) {
assert !crd.isLocal() : crd;
+ assert !exchCtx.exchangeFreeSwitch() : this;
cctx.exchange().exchangerBlockingSectionBegin();
@@ -2166,6 +2178,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param oldestNode Oldest node. Target node to send message to.
*/
private void sendPartitions(ClusterNode oldestNode) {
+ assert !exchCtx.exchangeFreeSwitch() : this;
+
try {
sendLocalPartitions(oldestNode);
}
@@ -2946,7 +2960,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (log.isDebugEnabled())
log.debug("Single message will be handled on completion of exchange future: " + this);
- listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ listen(failureHandlerWrapper(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
if (cctx.kernalContext().isStopping())
return;
@@ -2963,8 +2977,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (finishState0 == null) {
- assert (firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isClient())
- || isSnapshotOperation(firstDiscoEvt) : GridDhtPartitionsExchangeFuture.this;
+ assert (firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isClient()) :
+ GridDhtPartitionsExchangeFuture.this;
ClusterNode node = cctx.node(nodeId);
@@ -2990,7 +3004,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
sendAllPartitionsToNode(finishState0, msg, nodeId);
}
- });
+ }));
+ }
+
+ /**
+ * @param clsr Closure to wrap with failure handler.
+ * @return Wrapped closure.
+ */
+ private <T extends IgniteInternalFuture<?>> IgniteInClosure<T> failureHandlerWrapper(IgniteInClosure<T> clsr) {
+ try {
+ return (CI1<T>)clsr::apply;
+ }
+ catch (Error e) {
+ cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+ throw e;
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index ae8203a..99c2de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -1171,7 +1171,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(SnapshotStartDiscoveryMessage.class, this);
+ return S.toString(SnapshotStartDiscoveryMessage.class, this, super.toString());
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
index 7fe818a..9fa1e58 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -21,9 +21,15 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.OpenOption;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -50,8 +56,10 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
@@ -63,6 +71,7 @@ import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.distributed.FullMessage;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
@@ -854,6 +863,183 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
assertSnapshotCacheKeys(snp.cache(ccfg2.getName()));
}
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCoordinatorStopped() throws Exception {
+ CountDownLatch block = new CountDownLatch(1);
+ startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+ startClientGrid(3);
+
+ awaitPartitionMapExchange();
+
+ for (Ignite grid : Arrays.asList(grid(1), grid(2))) {
+ ((IgniteEx)grid).context().cache().context().exchange()
+ .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+ /** {@inheritDoc} */
+ @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ block.await();
+ }
+ catch (InterruptedException e) {
+ fail("Must not catch exception here: " + e.getMessage());
+ }
+ }
+ });
+ }
+
+ for (Ignite grid : G.allGrids()) {
+ TestRecordingCommunicationSpi.spi(grid)
+ .blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+ }
+
+ IgniteFuture<Void> fut = grid(1).snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ stopGrid(0);
+
+ block.countDown();
+
+ // There are two exchanges happen: snapshot, node left (with pme-free).
+ // Both of them are not require for sending messages.
+ assertFalse("Pme-free switch doesn't expect messaging exchanging between nodes",
+ GridTestUtils.waitForCondition(() -> {
+ boolean hasMsgs = false;
+
+ for (Ignite g : G.allGrids())
+ hasMsgs |= TestRecordingCommunicationSpi.spi(g).hasBlockedMessages();
+
+ return hasMsgs;
+ }, 5_000));
+
+ assertThrowsWithCause((Callable<Object>)fut::get, IgniteException.class);
+
+ List<GridDhtPartitionsExchangeFuture> exchFuts =
+ grid(1).context().cache().context().exchange().exchangeFutures();
+
+ assertFalse("Exchanges cannot be empty due to snapshot and node left happened",
+ exchFuts.isEmpty());
+
+ for (GridDhtPartitionsExchangeFuture exch : exchFuts) {
+ assertTrue("Snapshot and node left events must keep `rebalanced` state" + exch,
+ exch.rebalanced());
+ }
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotOnMovingPartitionsCoordinatorLeft() throws Exception {
+ startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ for (Ignite grid : G.allGrids()) {
+ TestRecordingCommunicationSpi.spi(grid)
+ .blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+ }
+
+ Ignite ignite = startGrid(2);
+
+ ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+ TestRecordingCommunicationSpi.spi(grid(0))
+ .waitForBlocked();
+
+ CountDownLatch latch = new CountDownLatch(G.allGrids().size());
+ IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(() -> {
+ try {
+ U.await(latch);
+
+ stopGrid(0);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ fail("Must not fail here: " + e.getMessage());
+ }
+ });
+
+ Queue<T2<GridDhtPartitionExchangeId, Boolean>> exchFuts = new ConcurrentLinkedQueue<>();
+
+ for (Ignite ig : G.allGrids()) {
+ ((IgniteEx)ig).context().cache().context().exchange()
+ .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+ /** {@inheritDoc} */
+ @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ exchFuts.add(new T2<>(fut.exchangeId(), fut.rebalanced()));
+ latch.countDown();
+
+ stopFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.log(log, "Interrupted on coordinator: " + e.getMessage());
+ }
+ }
+ });
+ }
+
+ IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ stopFut.get();
+
+ assertThrowsAnyCause(log,
+ fut::get,
+ IgniteException.class,
+ "Snapshot creation has been finished with an error");
+
+ assertEquals("Snapshot futures expected: " + exchFuts, 3, exchFuts.size());
+
+ for (T2<GridDhtPartitionExchangeId, Boolean> exch : exchFuts)
+ assertFalse("Snapshot `rebalanced` must be false with moving partitions: " + exch.get1(), exch.get2());
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotPartitionExchangeAwareOrder() throws Exception {
+ IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ Map<UUID, PartitionsExchangeAware> comps = new HashMap<>();
+
+ for (Ignite ig : G.allGrids()) {
+ PartitionsExchangeAware comp;
+
+ ((IgniteEx)ig).context().cache().context().exchange()
+ .registerExchangeAwareComponent(comp = new PartitionsExchangeAware() {
+ private final AtomicInteger order = new AtomicInteger();
+
+ @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ assertEquals("Exchange order violated: " + fut.firstEvent(), 0, order.getAndIncrement());
+ }
+
+ @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ assertEquals("Exchange order violated: " + fut.firstEvent(), 1, order.getAndIncrement());
+ }
+
+ @Override public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ assertEquals("Exchange order violated: " + fut.firstEvent(), 2, order.getAndIncrement());
+ }
+
+ @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ assertEquals("Exchange order violated: " + fut.firstEvent(), 3, order.getAndSet(0));
+ }
+ });
+
+ comps.put(((IgniteEx)ig).localNode().id(), comp);
+ }
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ for (Ignite ig : G.allGrids()) {
+ ((IgniteEx)ig).context().cache().context().exchange()
+ .unregisterExchangeAwareComponent(comps.get(((IgniteEx)ig).localNode().id()));
+ }
+
+ awaitPartitionMapExchange();
+
+ assertEquals("Some of ignite instances failed during snapshot", 3, G.allGrids().size());
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
+
+ assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+ }
+
/**
* @param ignite Ignite instance.
* @param started Latch will be released when delta partition processing starts.