You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/01/17 15:38:46 UTC
[ignite] branch master updated: IGNITE-12551 Partition desync if a
partition is evicted then owned again and historically rebalanced. - Fixes
#7270.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5e95f65 IGNITE-12551 Partition desync if a partition is evicted then owned again and historically rebalanced. - Fixes #7270.
5e95f65 is described below
commit 5e95f65bbc1d3c24a0d094745ab5790be712feac
Author: Aleksei Scherbakov <al...@gmail.com>
AuthorDate: Fri Jan 17 18:38:13 2020 +0300
IGNITE-12551 Partition desync if a partition is evicted then owned again and historically rebalanced. - Fixes #7270.
Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
.../cache/GridCachePartitionExchangeManager.java | 13 +++
.../cache/IgniteCacheOffheapManager.java | 2 +-
.../PartitionTxUpdateCounterDebugWrapper.java | 17 ++++
.../cache/PartitionTxUpdateCounterImpl.java | 2 +
.../dht/preloader/GridDhtPreloader.java | 4 +
.../dht/topology/GridDhtLocalPartition.java | 53 +++---------
.../dht/topology/GridDhtPartitionTopologyImpl.java | 31 ++++++-
.../dht/topology/PartitionsEvictManager.java | 6 +-
.../cache/transactions/IgniteTxHandler.java | 14 ++--
...acheRemoteMultiplePartitionReservationTest.java | 2 -
.../TxPartitionCounterStateAbstractTest.java | 31 ++++++-
...ounterStateConsistencyHistoryRebalanceTest.java | 98 ++++++++++++++++++++++
.../TxPartitionCounterStateConsistencyTest.java | 24 ++++--
...unterStateConsistencyVolatileRebalanceTest.java | 21 +++++
...ounterStateOnePrimaryTwoBackupsFailAllTest.java | 7 +-
15 files changed, 252 insertions(+), 73 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 67a62ca..eba10ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -279,6 +279,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** Histogram of blocking PME durations. */
private volatile HistogramMetric blockingDurationHistogram;
+ /** Delay before rebalancing code is start executing after exchange completion. For tests only. */
+ private volatile long rebalanceDelay;
+
/** Discovery listener. */
private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
@Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -2479,6 +2482,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * @param delay Rebalance delay.
+ */
+ public void rebalanceDelay(long delay) {
+ this.rebalanceDelay = delay;
+ }
+
+ /**
* For testing only.
*
* @return Current version to wait for.
@@ -3307,6 +3317,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
rebTopVer = NONE;
if (!cctx.kernalContext().clientNode() && rebTopVer.equals(NONE)) {
+ if (rebalanceDelay > 0)
+ U.sleep(rebalanceDelay);
+
assignsMap = new HashMap<>();
IgniteCacheSnapshotManager snp = cctx.snapshot();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index e73ad52..51fd9cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1086,7 +1086,7 @@ public interface IgniteCacheOffheapManager {
public void preload() throws IgniteCheckedException;
/**
- * Reset counters for partition.
+ * Reset counter for partition.
*/
void resetUpdateCounter();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java
index a63479b..3bf92aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java
@@ -198,4 +198,21 @@ public class PartitionTxUpdateCounterDebugWrapper extends PartitionTxUpdateCount
return updated;
}
+
+ @Override public synchronized void reset() {
+ SB sb = new SB();
+
+ sb.a("[op=reset" +
+ ", grpId=" + grp.groupId() +
+ ", partId=" + partId +
+ ", before=" + toString());
+
+ try {
+ super.reset();
+ }
+ finally {
+ log.debug(sb.a(", after=" + toString() +
+ ']').toString());
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
index 03293c8..06692d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
@@ -325,6 +325,8 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
/** {@inheritDoc} */
@Override public synchronized void reset() {
+ initCntr = 0;
+
cntr.set(0);
reserveCntr.set(0);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index fc2a1b6..2ab4d90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -239,6 +239,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
part.awaitDestroy();
part = top.localPartition(p, topVer, true);
+
+ assert part != null : "Partition was not created [grp=" + grp.name() + ", topVer=" + topVer + ", p=" + p + ']';
+
+ part.resetUpdateCounter();
}
assert part.state() == MOVING : "Partition has invalid state for rebalance " + aff.topologyVersion() + " " + part;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index a7175e0..a45aa92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -786,54 +786,24 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
- * @return {@code true} if evicting thread was added.
+ * @return {@code True} if evicting thread was added.
*/
private boolean addEvicting() {
- while (true) {
- int cnt = evictGuard.get();
-
- if (cnt != 0)
- return false;
-
- if (evictGuard.compareAndSet(cnt, cnt + 1))
- return true;
- }
+ return evictGuard.compareAndSet(0, 1);
}
/**
- * @return {@code true} if no thread evicting partition at the moment.
+ * Clears evicting flag.
*/
- private boolean clearEvicting() {
- boolean free;
-
- while (true) {
- int cnt = evictGuard.get();
-
- assert cnt > 0;
-
- if (evictGuard.compareAndSet(cnt, cnt - 1)) {
- free = cnt == 1;
-
- break;
- }
- }
-
- return free;
+ private void clearEvicting() {
+ evictGuard.set(0);
}
/**
- * @return {@code True} if partition is safe to destroy.
+ * @return {@code True} if partition is marked for destroy.
*/
public boolean markForDestroy() {
- while (true) {
- int cnt = evictGuard.get();
-
- if (cnt != 0)
- return false;
-
- if (evictGuard.compareAndSet(0, -1))
- return true;
- }
+ return evictGuard.compareAndSet(0, -1);
}
/**
@@ -947,10 +917,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
throw e;
}
finally {
- boolean free = clearEvicting();
+ clearEvicting();
- if (free)
- clearFuture.finish();
+ clearFuture.finish();
}
}
@@ -1108,9 +1077,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
- * Reset partition counters.
+ * Reset partition update counter.
*/
- public void resetCounters() {
+ public void resetUpdateCounter() {
store.resetUpdateCounter();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 50f8a19..5ee3243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -882,12 +882,20 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtLocalPartition loc = locParts.get(p);
if (loc == null || loc.state() == EVICTED) {
+ boolean recreate = false;
+
// Make sure that after eviction partition is destroyed.
- if (loc != null)
+ if (loc != null) {
loc.awaitDestroy();
+ recreate = true;
+ }
+
locParts.set(p, loc = partFactory.create(ctx, grp, p));
+ if (recreate)
+ loc.resetUpdateCounter();
+
long updCntr = cntrMap.updateCounter(p);
if (updCntr != 0)
@@ -914,15 +922,23 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
GridDhtLocalPartition part = locParts.get(p);
+ boolean recreate = false;
+
if (part != null) {
if (part.state() != EVICTED)
return part;
- else
+ else {
part.awaitDestroy();
+
+ recreate = true;
+ }
}
part = new GridDhtLocalPartition(ctx, grp, p, true);
+ if (recreate)
+ part.resetUpdateCounter();
+
locParts.set(p, part);
return part;
@@ -970,10 +986,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
boolean belongs = partitionLocalNode(p, topVer);
+ boolean recreate = false;
+
if (loc != null && state == EVICTED) {
// Make sure that after eviction partition is destroyed.
loc.awaitDestroy();
+ recreate = true;
+
locParts.set(p, loc = null);
if (!belongs) {
@@ -1000,6 +1020,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
locParts.set(p, loc = partFactory.create(ctx, grp, p));
+ if (recreate)
+ loc.resetUpdateCounter();
+
this.updateSeq.incrementAndGet();
created = true;
@@ -2208,7 +2231,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer);
// Reset counters to zero for triggering full rebalance.
- locPart.resetCounters();
+ locPart.resetUpdateCounter();
}
}
}
@@ -2833,8 +2856,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part == null)
continue;
- long updCntr = part.updateCounter();
long initCntr = part.initialUpdateCounter();
+ long updCntr = part.updateCounter();
if (skipZeros && initCntr == 0L && updCntr == 0L)
continue;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
index 31399bd..3c4b1e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
@@ -416,10 +416,8 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
try {
boolean success = part.tryClear(grpEvictionCtx);
- if (success) {
- if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy())
- part.destroy();
- }
+ if (success && part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy())
+ part.destroy();
// Complete eviction future before schedule new to prevent deadlock with
// simultaneous eviction stopping and scheduling new eviction.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 42e3c19..5eee8ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -66,7 +66,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeVal
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture;
@@ -109,6 +108,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
import static org.apache.ignite.internal.util.lang.GridFunc.isEmpty;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
@@ -1767,8 +1767,12 @@ public class IgniteTxHandler {
try {
tx.addWrite(entry, ctx.deploy().globalLoader());
+ // Entry will be invalidated if a partition was moved to RENTING.
+ if (locPart.state() == RENTING)
+ continue;
+
if (txCounters != null) {
- Long cntr = txCounters.generateNextCounter(entry.cacheId(), entry.cached().partition());
+ Long cntr = txCounters.generateNextCounter(entry.cacheId(), part);
if (cntr != null) // Counter is null if entry is no-op.
entry.updateCounter(cntr);
@@ -1910,8 +1914,8 @@ public class IgniteTxHandler {
if (locPart != null && locPart.reserve()) {
try {
- // do not process renting partitions.
- if (locPart.state() == GridDhtPartitionState.RENTING) {
+ // Skip renting partitions.
+ if (locPart.state() == RENTING) {
tx.addInvalidPartition(ctx.cacheId(), part);
continue;
@@ -2308,7 +2312,7 @@ public class IgniteTxHandler {
if (part != null && part.reserve()) {
try {
- if (part.state() != GridDhtPartitionState.RENTING) { // Check is actual only for backup node.
+ if (part.state() != RENTING) { // Check is actual only for backup node.
long start = counter.initialCounter(i);
long delta = counter.updatesCount(i);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheRemoteMultiplePartitionReservationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheRemoteMultiplePartitionReservationTest.java
index 015f3ed..c6c269f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheRemoteMultiplePartitionReservationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheRemoteMultiplePartitionReservationTest.java
@@ -59,8 +59,6 @@ public class TxCrossCacheRemoteMultiplePartitionReservationTest extends GridComm
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setClientMode(igniteInstanceName.equals("client"));
-
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
cfg.setClientMode("client".equals(igniteInstanceName));
cfg.setCacheConfiguration(cacheConfiguration(CACHE1), cacheConfiguration(CACHE2));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java
index c32bd6b..773ddde 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
@@ -60,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -147,11 +149,18 @@ public abstract class TxPartitionCounterStateAbstractTest extends GridCommonAbst
return cfg;
}
+ /**
+ * @return Partitions count.
+ */
+ protected int partitions() {
+ return PARTS_CNT;
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
- System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "false");
-
super.beforeTestsStarted();
+
+ System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "false");
}
/** {@inheritDoc} */
@@ -176,7 +185,7 @@ public abstract class TxPartitionCounterStateAbstractTest extends GridCommonAbst
ccfg.setBackups(backups);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setOnheapCacheEnabled(false);
- ccfg.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, partitions()));
return ccfg;
}
@@ -1096,6 +1105,22 @@ public abstract class TxPartitionCounterStateAbstractTest extends GridCommonAbst
}
}
+ /**
+ * Blocks tx recovery between all nodes.
+ */
+ protected void blockRecovery() {
+ for (Ignite grid : G.allGrids()) {
+ if (grid.configuration().isClientMode())
+ continue;
+
+ TestRecordingCommunicationSpi.spi(grid).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode clusterNode, Message msg) {
+ return msg instanceof GridCacheTxRecoveryRequest;
+ }
+ });
+ }
+ }
+
/** */
private enum TxState {
/** Prepare. */ PREPARE,
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
index 6cfa21b..ce23983 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
@@ -17,8 +17,15 @@
package org.apache.ignite.internal.processors.cache.transactions;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.Test;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
/**
@@ -26,4 +33,95 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_
*/
@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
public class TxPartitionCounterStateConsistencyHistoryRebalanceTest extends TxPartitionCounterStateConsistencyTest {
+ /** */
+ @Test
+ public void testConsistencyAfterBaselineNodeStopAndRemoval() throws Exception {
+ doTestConsistencyAfterBaselineNodeStopAndRemoval(0);
+ }
+
+ /** */
+ @Test
+ public void testConsistencyAfterBaselineNodeStopAndRemoval_WithRestart() throws Exception {
+ doTestConsistencyAfterBaselineNodeStopAndRemoval(1);
+ }
+
+ /** */
+ @Test
+ public void testConsistencyAfterBaselineNodeStopAndRemoval_WithRestartAndSkipCheckpoint() throws Exception {
+ doTestConsistencyAfterBaselineNodeStopAndRemoval(2);
+ }
+
+ /**
+ * Test a scenario when partition is evicted and owned again with non-zero initial and current counters.
+ * Such partition should not be historically rebalanced, otherwise only subset of data will be rebalanced.
+ */
+ private void doTestConsistencyAfterBaselineNodeStopAndRemoval(int mode) throws Exception {
+ backups = 2;
+
+ final int srvNodes = SERVER_NODES + 1;
+
+ IgniteEx prim = startGrids(srvNodes);
+
+ prim.cluster().active(true);
+
+ for (int p = 0; p < partitions(); p++) {
+ prim.cache(DEFAULT_CACHE_NAME).put(p, p);
+ prim.cache(DEFAULT_CACHE_NAME).put(p + PARTS_CNT, p * 2);
+ }
+
+ forceCheckpoint();
+
+ stopGrid(1); // topVer=5,0
+
+ awaitPartitionMapExchange();
+
+ resetBaselineTopology(); // topVer=5,1
+
+ awaitPartitionMapExchange();
+
+ forceCheckpoint(); // Will force GridCacheDataStore.exists=true mode after part store re-creation.
+
+ startGrid(1); // topVer=6,0
+
+ awaitPartitionMapExchange();
+
+ resetBaselineTopology(); // topVer=6,1
+
+ awaitPartitionMapExchange(true, true, null);
+
+ // Create counter difference with evicted partition so it's applicable for historical rebalancing.
+ for (int p = 0; p < partitions(); p++)
+ prim.cache(DEFAULT_CACHE_NAME).put(p + PARTS_CNT, p * 2 + 1);
+
+ stopGrid(1); // topVer=7,0
+
+ if (mode > 0) {
+ stopGrid(mode == 1, grid(2).name());
+ stopGrid(mode == 1, grid(3).name());
+
+ startGrid(2);
+ startGrid(3);
+ }
+
+ prim.context().cache().context().exchange().rebalanceDelay(500);
+
+ Random r = new Random();
+
+ AtomicBoolean stop = new AtomicBoolean();
+
+ final IgniteInternalFuture<?> fut = doRandomUpdates(r,
+ prim,
+ IntStream.range(0, 1000).boxed().collect(toList()),
+ prim.cache(DEFAULT_CACHE_NAME),
+ stop::get);
+
+ resetBaselineTopology(); // topVer=7,1
+
+ awaitPartitionMapExchange();
+
+ stop.set(true);
+ fut.get();
+
+ assertPartitionsSame(idleVerify(prim, DEFAULT_CACHE_NAME));
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index 0fa28a7..32e2ad6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.IntStream;
@@ -189,7 +190,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
}
}, 1, "node-restarter");
- doRandomUpdates(r, client, primaryKeys, cache, stop).get(stop + 30_000);
+ doRandomUpdates(r, client, primaryKeys, cache, () -> U.currentTimeMillis() >= stop).get(stop + 30_000);
fut.get();
assertPartitionsSame(idleVerify(client, DEFAULT_CACHE_NAME));
@@ -253,7 +254,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
}
}, 1, "node-restarter");
- doRandomUpdates(r, prim, primaryKeys, cache, stop).get(stop + 30_000);
+ doRandomUpdates(r, prim, primaryKeys, cache, () -> U.currentTimeMillis() >= stop).get(stop + 30_000);
fut.get();
assertPartitionsSame(idleVerify(prim, DEFAULT_CACHE_NAME));
@@ -329,7 +330,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
}, 1, "node-restarter");
// Wait with timeout to avoid hanging suite.
- doRandomUpdates(r, prim, primaryKeys, cache, stop).get(stop + 30_000);
+ doRandomUpdates(r, prim, primaryKeys, cache, () -> U.currentTimeMillis() >= stop).get(stop + 30_000);
fut.get();
assertPartitionsSame(idleVerify(prim, DEFAULT_CACHE_NAME));
@@ -702,7 +703,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
// Put one key per partition.
try(IgniteDataStreamer<Object, Object> streamer = client.dataStreamer(DEFAULT_CACHE_NAME)) {
- for (int k = 0; k < PARTS_CNT; k++)
+ for (int k = 0; k < partitions(); k++)
streamer.addData(k, 0);
}
@@ -839,7 +840,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
IgniteCache<Object, Object> cache2 = client.getOrCreateCache(cacheConfiguration(DEFAULT_CACHE_NAME + "2"));
// Put one key per partition.
- for (int k = 0; k < PARTS_CNT; k++) {
+ for (int k = 0; k < partitions(); k++) {
cache.put(k, 0);
cache2.put(k, 0);
}
@@ -973,18 +974,23 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
* @param near Near node.
* @param primaryKeys Primary keys.
* @param cache Cache.
- * @param stop Time to stop.
+ * @param stopClo A closure providing stop condition.
* @return Finish future.
*/
- private IgniteInternalFuture<?> doRandomUpdates(Random r, Ignite near, List<Integer> primaryKeys,
- IgniteCache<Object, Object> cache, long stop) throws Exception {
+ protected IgniteInternalFuture<?> doRandomUpdates(
+ Random r,
+ Ignite near,
+ List<Integer> primaryKeys,
+ IgniteCache<Object, Object> cache,
+ BooleanSupplier stopClo
+ ) throws Exception {
LongAdder puts = new LongAdder();
LongAdder removes = new LongAdder();
final int max = 100;
return multithreadedAsync(() -> {
- while (U.currentTimeMillis() < stop) {
+ while (!stopClo.getAsBoolean()) {
int rangeStart = r.nextInt(primaryKeys.size() - max);
int range = 5 + r.nextInt(max - 5);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyVolatileRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyVolatileRebalanceTest.java
index ac3d0a0..b3ef2d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyVolatileRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyVolatileRebalanceTest.java
@@ -22,6 +22,8 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.junit.Ignore;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED;
+
/**
* Test partitions consistency in various scenarios when all rebalance is in-memory.
*/
@@ -32,6 +34,20 @@ public class TxPartitionCounterStateConsistencyVolatileRebalanceTest extends TxP
}
/** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "true");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ System.clearProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED);
+ }
+
+ /** {@inheritDoc} */
@Ignore
@Override public void testSingleThreadedUpdateOrder() throws Exception {
// Not applicable for volatile mode.
@@ -47,4 +63,9 @@ public class TxPartitionCounterStateConsistencyVolatileRebalanceTest extends TxP
@Override protected void forceCheckpoint(Collection<Ignite> nodes) throws IgniteCheckedException {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override protected int partitions() {
+ return 1024;
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
index d9fcd93..00e3256 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
@@ -297,10 +297,11 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest extends TxPa
}
if (backup == backup1) {
- // Stop all backups first or recovery will commit a transaction on backups.
+ blockRecovery();
+
stopGrid(skipCheckpoint, txTop.get2().get(0).name());
stopGrid(skipCheckpoint, txTop.get2().get(1).name());
- stopAllGrids();
+ stopAllGrids(); // Stop all remaining nodes.
}
return true;
@@ -320,7 +321,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest extends TxPa
System.setProperty(IGNITE_FAIL_NODE_ON_UNRECOVERABLE_PARTITION_INCONSISTENCY, "true");
try {
- // Start only backups.
+ // Start only backups in given order.
startGrid(txTop.get(PARTITION_ID).get2().get(backupsStartOrder[0]).name());
startGrid(txTop.get(PARTITION_ID).get2().get(backupsStartOrder[1]).name());