You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/15 21:38:22 UTC
[ignite] branch master updated: IGNITE-14528 Fixed race between
rebalance and checkpoint which led to assertion error in
GridDhtPartitionDemander$RebalanceFuture.ownPartitionsAndFinishFuture.
Fixes #9003
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3561e87 IGNITE-14528 Fixed race between rebalance and checkpoint which led to assertion error in GridDhtPartitionDemander$RebalanceFuture.ownPartitionsAndFinishFuture. Fixes #9003
3561e87 is described below
commit 3561e87a3490165c81353195bc2b2f84825226cd
Author: denis-chudov <mo...@gmail.com>
AuthorDate: Fri Apr 16 00:36:36 2021 +0300
IGNITE-14528 Fixed race between rebalance and checkpoint which led to assertion error in GridDhtPartitionDemander$RebalanceFuture.ownPartitionsAndFinishFuture. Fixes #9003
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../cache/GridCachePartitionExchangeManager.java | 10 +-
.../processors/cache/GridCachePreloader.java | 3 +-
.../cache/GridCachePreloaderAdapter.java | 2 +-
.../processors/cache/GridCacheProcessor.java | 2 +-
.../dht/preloader/FinishPreloadingTask.java | 21 ++-
.../dht/preloader/GridDhtPartitionDemander.java | 85 +++++----
.../dht/preloader/GridDhtPreloader.java | 4 +-
.../rebalancing/RebalanceStatisticsTest.java | 3 +-
.../persistence/db/wal/IgniteWalRebalanceTest.java | 195 ++++++++++++++++++++-
9 files changed, 268 insertions(+), 57 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1f27819..64bf456 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1278,9 +1278,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param topVer Topology version.
* @param grpId Group id.
+ * @param rebalanceId Rebalance id.
*/
- public void finishPreloading(AffinityTopologyVersion topVer, int grpId) {
- exchWorker.finishPreloading(topVer, grpId);
+ public void finishPreloading(AffinityTopologyVersion topVer, int grpId, long rebalanceId) {
+ exchWorker.finishPreloading(topVer, grpId, rebalanceId);
}
/**
@@ -3038,9 +3039,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param topVer Topology version.
* @param grpId Group id.
+ * @param rebalanceId Rebalance id.
*/
- void finishPreloading(AffinityTopologyVersion topVer, int grpId) {
- futQ.add(new FinishPreloadingTask(topVer, grpId));
+ void finishPreloading(AffinityTopologyVersion topVer, int grpId, long rebalanceId) {
+ futQ.add(new FinishPreloadingTask(topVer, grpId, rebalanceId));
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index c5b0b4a..4da69ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -237,6 +237,7 @@ public interface GridCachePreloader {
* Finish preloading for given topology version.
*
* @param topVer Topology version.
+ * @param rebalanceId Rebalance id.
*/
- public void finishPreloading(AffinityTopologyVersion topVer);
+ public void finishPreloading(AffinityTopologyVersion topVer, long rebalanceId);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 783f8e5..31d209b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -205,7 +205,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public void finishPreloading(AffinityTopologyVersion topVer) {
+ @Override public void finishPreloading(AffinityTopologyVersion topVer, long rebalanceId) {
// No-op.
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 6b5e493..507a35a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -459,7 +459,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheGroupContext grp = cacheGroup(task0.groupId());
if (grp != null)
- grp.preloader().finishPreloading(task0.topologyVersion());
+ grp.preloader().finishPreloading(task0.topologyVersion(), task0.rebalanceId());
}
else
U.warn(log, "Unsupported custom exchange task: " + task);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java
index b0ae758..a931259 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java
@@ -24,22 +24,22 @@ import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerT
* A task for finishing preloading future in exchange worker thread.
*/
public class FinishPreloadingTask implements CachePartitionExchangeWorkerTask {
- /**
- * Topology version.
- */
+ /** Topology version. */
private final AffinityTopologyVersion topVer;
- /**
- * Group id.
- */
+ /** Group id. */
private final int grpId;
+ /** Rebalance id. */
+ private final long rebalanceId;
+
/**
* @param topVer Topology version.
*/
- public FinishPreloadingTask(AffinityTopologyVersion topVer, int grpId) {
+ public FinishPreloadingTask(AffinityTopologyVersion topVer, int grpId, long rebalanceId) {
this.grpId = grpId;
this.topVer = topVer;
+ this.rebalanceId = rebalanceId;
}
/**
@@ -62,4 +62,11 @@ public class FinishPreloadingTask implements CachePartitionExchangeWorkerTask {
public int groupId() {
return grpId;
}
+
+ /**
+ * @return Rebalance id.
+ */
+ public long rebalanceId() {
+ return rebalanceId;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 10b9969..c012b5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -1323,7 +1323,7 @@ public class GridDhtPartitionDemander {
if (isDone()) {
assert !result() : "Rebalance future was done, but partitions never requested [grp="
- + grp.cacheOrGroupName() + ", topVer=" + topologyVersion() + "]";
+ + grp.cacheOrGroupName() + ", topVer=" + topologyVersion() + ", rebalanceId=" + rebalanceId + ']';
return;
}
@@ -1348,7 +1348,7 @@ public class GridDhtPartitionDemander {
U.log(log, "Prepared rebalancing [grp=" + grp.cacheOrGroupName()
+ ", mode=" + cfg.getRebalanceMode() + ", supplier=" + node.id() + ", partitionsCount=" + parts.size()
- + ", topVer=" + topologyVersion() + "]");
+ + ", topVer=" + topologyVersion() + ", rebalanceId=" + rebalanceId + ']');
}
if (!parts.isEmpty()) {
@@ -1415,7 +1415,8 @@ public class GridDhtPartitionDemander {
", topVer=" + topVer +
", supplier=" + supplierNode.id() +
", fullPartitions=" + S.compact(parts.fullSet()) +
- ", histPartitions=" + S.compact(parts.historicalSet()) + ']');
+ ", histPartitions=" + S.compact(parts.historicalSet()) +
+ ", rebalanceId=" + rebalanceId + ']');
ctx.io().sendOrderedMessage(supplierNode, msg.topic(),
msg.convertIfNeeded(supplierNode.version()), grp.ioPolicy(), msg.timeout());
@@ -1443,36 +1444,11 @@ public class GridDhtPartitionDemander {
}
}
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
- if (super.onDone(res, err)) {
- if (!isInitial()) {
- sendRebalanceFinishedEvent();
-
- if (log.isInfoEnabled())
- log.info("Completed rebalance future: " + this);
-
- // Complete sync future only if rebalancing was not cancelled.
- if (res && !grp.preloader().syncFuture().isDone())
- ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
-
- if (isChainFinished())
- onChainFinished();
- }
-
- if (next != null)
- next.requestPartitions(); // Process next group.
-
- return true;
- }
-
- return false;
- }
-
/**
* @param topVer Rebalancing topology version.
+ * @param rebalanceId Rebalance id.
*/
- public void ownPartitionsAndFinishFuture(AffinityTopologyVersion topVer) {
+ public void ownPartitionsAndFinishFuture(AffinityTopologyVersion topVer, long rebalanceId) {
// Ignore all client exchanges.
// Note rebalancing may be started on client topology version if forced reassign was queued after client
// topology exchange.
@@ -1489,6 +1465,16 @@ public class GridDhtPartitionDemander {
return;
}
+ if (this.rebalanceId != rebalanceId) {
+ if (log.isInfoEnabled()) {
+ log.info("Received preloading task with wrong rebalanceId, ignoring it [rebalanceId=" +
+ this.rebalanceId + ", finishPreloadingTaskRebalanceId=" + rebalanceId + ", grpId=" +
+ grp.groupId() + ", topVer=" + topVer + ']');
+ }
+
+ return;
+ }
+
if (onDone(true, null)) {
assert state == RebalanceFutureState.STARTED : this;
@@ -1500,7 +1486,7 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled())
log.debug("Partitions have been scheduled to resend [reason=" +
- "Group durability restored, name=" + grp.cacheOrGroupName() + "]");
+ "Group durability restored, name=" + grp.cacheOrGroupName() + ']');
ctx.exchange().refreshPartitions(Collections.singleton(grp));
}
@@ -1534,11 +1520,17 @@ public class GridDhtPartitionDemander {
}
/**
- * Cancel running future or mark for cancel {@code RebalanceFutureState#MARK_CANCELLED}.
+ * Cancel running future or mark for cancel {@code RebalanceFutureState#MARK_CANCELLED}, if it not started yet.
*/
private void tryCancel() {
- if (STATE_UPD.compareAndSet(this, RebalanceFutureState.INIT, RebalanceFutureState.MARK_CANCELLED))
+ if (STATE_UPD.compareAndSet(this, RebalanceFutureState.INIT, RebalanceFutureState.MARK_CANCELLED)) {
+ U.log(log, "Rebalancing marked as cancelled [grp=" + grp.cacheOrGroupName() +
+ ", topVer=" + topologyVersion() + ", rebalanceId=" + rebalanceId + ']');
+
+ // Don't call #cancel() for this future from INIT state, as it will trigger #requestPartitions()
+ // for #next future.
return;
+ }
cancel();
}
@@ -1559,7 +1551,7 @@ public class GridDhtPartitionDemander {
return true;
U.log(log, "Cancelled rebalancing from all nodes [grp=" + grp.cacheOrGroupName() +
- ", topVer=" + topologyVersion() + "]");
+ ", topVer=" + topologyVersion() + ", rebalanceId=" + rebalanceId + ']');
if (!ctx.kernalContext().isStopping()) {
for (UUID nodeId : remaining.keySet())
@@ -1590,6 +1582,23 @@ public class GridDhtPartitionDemander {
lastCancelledTime.accumulateAndGet(System.currentTimeMillis(), Math::max);
else if (startTime != -1)
endTime = System.currentTimeMillis();
+
+ if (log != null && log.isInfoEnabled() && !isInitial())
+ log.info("Completed rebalance future: " + this + (isFailed() ? ", error=" + err : ""));
+
+ if (!isInitial()) {
+ sendRebalanceFinishedEvent();
+
+ // Complete sync future only if rebalancing was not cancelled.
+ if (res && !grp.preloader().syncFuture().isDone())
+ ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
+
+ if (isChainFinished())
+ onChainFinished();
+ }
+
+ if (next != null)
+ next.requestPartitions(); // Process next group.
}
return byThisCall;
@@ -1757,7 +1766,7 @@ public class GridDhtPartitionDemander {
cp.onStateChanged(PAGE_SNAPSHOT_TAKEN, () -> grp.localWalEnabled(true, false));
cp.onStateChanged(FINISHED, () -> {
- ctx.exchange().finishPreloading(topVer, grp.groupId());
+ ctx.exchange().finishPreloading(topVer, grp.groupId(), rebalanceId);
});
}
else {
@@ -2001,7 +2010,7 @@ public class GridDhtPartitionDemander {
", fullEntries=" + fullEntries +
", fullBytesRcvd=" + U.humanReadableByteCount(fullBytes) +
", topVer=" + topologyVersion() +
- ", progress=" + (routines - remainingRoutines) + "/" + routines + "]");
+ ", progress=" + (routines - remainingRoutines) + "/" + routines + ']');
}
catch (Throwable t) {
U.error(log, "Completed " + ((remainingRoutines == 0 ? "(final) " : "") +
@@ -2021,9 +2030,9 @@ public class GridDhtPartitionDemander {
/**
* @param topVer Topopolog verion.
*/
- void finishPreloading(AffinityTopologyVersion topVer) {
+ void finishPreloading(AffinityTopologyVersion topVer, long rebalanceId) {
assert !rebalanceFut.isInitial() : topVer;
- rebalanceFut.ownPartitionsAndFinishFuture(topVer);
+ rebalanceFut.ownPartitionsAndFinishFuture(topVer, rebalanceId);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 925f7a8..0e6222d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -558,12 +558,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public void finishPreloading(AffinityTopologyVersion topVer) {
+ @Override public void finishPreloading(AffinityTopologyVersion topVer, long rebalanceId) {
if (!enterBusy())
return;
try {
- demander.finishPreloading(topVer);
+ demander.finishPreloading(topVer, rebalanceId);
}
finally {
leaveBusy();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java
index 9e02a72..304eea5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java
@@ -51,6 +51,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
+import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
@@ -180,7 +181,7 @@ public class RebalanceStatisticsTest extends GridCommonAbstractTest {
};
assertTrue(
- supplierMsgs.toString(),
+ "msgs=" + supplierMsgs.toString() + ", checVals=" + asList(checVals).toString(),
supplierMsgs.stream().anyMatch(s -> Stream.of(checVals).allMatch(s::contains))
);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 7567705..f43da2f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -31,6 +31,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -64,7 +65,11 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.WalTestUtils;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
@@ -75,6 +80,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAhea
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -90,8 +96,11 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
/**
* Historical WAL rebalance base test.
@@ -140,8 +149,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
- .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)
- );
+ .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE));
cfg.setDataStorageConfiguration(dbCfg);
@@ -978,6 +986,189 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
}
/**
+ * Tests that owning partitions (that are trigged by rebalance future) cannot be mapped to a new rebalance future
+ * that was created by RebalanceReassignExchangeTask.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceReassignAndOwnPartitions() throws Exception {
+ backups = 3;
+
+ IgniteEx supplier1 = startGrid(0);
+ IgniteEx supplier2 = startGrid(1);
+ IgniteEx demander = startGrid(2);
+
+ supplier1.cluster().state(ACTIVE);
+
+ String cacheName1 = "test-cache-1";
+ String cacheName2 = "test-cache-2";
+
+ IgniteCache<Integer, IndexedObject> c1 = supplier1.getOrCreateCache(
+ new CacheConfiguration<Integer, IndexedObject>(cacheName1)
+ .setBackups(backups)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setRebalanceOrder(10));
+
+ IgniteCache<Integer, IndexedObject> c2 = supplier1.getOrCreateCache(
+ new CacheConfiguration<Integer, IndexedObject>(cacheName2)
+ .setBackups(backups)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setRebalanceOrder(20));
+
+ // Fill initial data.
+ final int entryCnt = PARTS_CNT * 200;
+ final int preloadEntryCnt = PARTS_CNT * 400;
+
+ for (int k = 0; k < preloadEntryCnt; k++) {
+ c1.put(k, new IndexedObject(k));
+
+ c2.put(k, new IndexedObject(k));
+ }
+
+ forceCheckpoint();
+
+ stopGrid(2);
+
+ // Rewrite data to trigger further rebalance.
+ // Make sure that all partitions will be updated in order to disable wal locally for preloading.
+ // Updating entryCnt keys allows to trigger historical rebalance.
+ // This is an easy way to emulate missing partitions on the first rebalance.
+ for (int i = 0; i < entryCnt; i++)
+ c1.put(i, new IndexedObject(i));
+
+ // Full rebalance for the cacheName2.
+ for (int i = 0; i < preloadEntryCnt; i++)
+ c2.put(i, new IndexedObject(i));
+
+ // Delay rebalance process for specified groups.
+ blockMsgPred = (node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+ return msg0.groupId() == CU.cacheId(cacheName1) || msg0.groupId() == CU.cacheId(cacheName2);
+ }
+
+ return false;
+ };
+
+ // Emulate missing partitions and trigger RebalanceReassignExchangeTask which should re-trigger a new rebalance.
+ FailingIOFactory ioFactory = injectFailingIOFactory(supplier1);
+
+ demander = startGrid(2);
+
+ TestRecordingCommunicationSpi demanderSpi = TestRecordingCommunicationSpi.spi(grid(2));
+
+ // Wait until demander starts rebalancning.
+ demanderSpi.waitForBlocked();
+
+ // Need to start a client node in order to block RebalanceReassignExchangeTask (and do not change the affinity)
+ // until cacheName2 triggers a checkpoint after rebalancing.
+ CountDownLatch blockClientJoin = new CountDownLatch(1);
+ CountDownLatch unblockClientJoin = new CountDownLatch(1);
+
+ demander.context().cache().context().exchange().registerExchangeAwareComponent(new PartitionsExchangeAware() {
+ @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ blockClientJoin.countDown();
+
+ try {
+ if (!unblockClientJoin.await(getTestTimeout(), MILLISECONDS))
+ throw new IgniteException("Failed to wait for client node joinning the cluster.");
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException("Unexpected exception.", e);
+ }
+ }
+ });
+
+ startClientGrid(4);
+
+ // Wait for a checkpoint after rebalancing cacheName2.
+ CountDownLatch blockCheckpoint = new CountDownLatch(1);
+ CountDownLatch unblockCheckpoint = new CountDownLatch(1);
+
+ ((GridCacheDatabaseSharedManager) demander
+ .context()
+ .cache()
+ .context()
+ .database())
+ .addCheckpointListener(new CheckpointListener() {
+ /** {@inheritDoc} */
+ @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+ if (!ctx.progress().reason().contains(String.valueOf(CU.cacheId(cacheName2))))
+ return;
+
+ blockCheckpoint.countDown();
+
+ try {
+ if (!unblockCheckpoint.await(getTestTimeout(), MILLISECONDS))
+ throw new IgniteCheckedException("Failed to wait for unblocking checkpointer.");
+ }
+ catch (InterruptedException e) {
+ throw new IgniteCheckedException("Unexpected exception", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException {
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
+ }
+ });
+
+ // Unblock the first rebalance.
+ demanderSpi.stopBlock();
+
+ // Wait for start of the checkpoint after rebalancing cacheName2.
+ assertTrue("Failed to wait for checkpoint.", blockCheckpoint.await(getTestTimeout(), MILLISECONDS));
+
+ // Block the second rebalancing.
+ demanderSpi.blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+ return msg0.groupId() == CU.cacheId(cacheName1);
+ }
+
+ return false;
+ });
+
+ ioFactory.reset();
+
+ // Let's unblock client exchange and, therefore, handling of RebalanceReassignExchangeTask,
+ // which is already scheduled.
+ unblockClientJoin.countDown();
+
+ // Wait for starting the second rebalance (new chain of rebalance futures should be created at this point).
+ demanderSpi.waitForBlocked();
+
+ GridFutureAdapter checkpointFut = ((GridCacheDatabaseSharedManager) demander
+ .context()
+ .cache()
+ .context()
+ .database())
+ .getCheckpointer()
+ .currentProgress()
+ .futureFor(FINISHED);
+
+ // Unblock checkpointer.
+ unblockCheckpoint.countDown();
+
+ assertTrue(
+ "Failed to wait for a checkpoint.",
+ GridTestUtils.waitForCondition(() -> checkpointFut.isDone(), getTestTimeout()));
+
+ // Well, there is a race between we unblock rebalance and the current checkpoint executes all its listeners.
+ demanderSpi.stopBlock();
+
+ awaitPartitionMapExchange(false, true, null);
+ }
+
+ /**
* Injects a new instance of FailingIOFactory into wal manager for the given supplier node.
* This allows to break historical rebalance fo=rom the supplier.
*