You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/06/21 18:56:54 UTC
[ignite] branch master updated: IGNITE-11867 Fix flaky test
GridCacheRebalancingWithAsyncClearingTest#testCorrectRebalancingCurrentlyRentingPartitions
This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ecec080 IGNITE-11867 Fix flaky test GridCacheRebalancingWithAsyncClearingTest#testCorrectRebalancingCurrentlyRentingPartitions
ecec080 is described below
commit ecec0801c67c9f8bd3ce234adbca665f876e03ad
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Fri Jun 21 21:56:30 2019 +0300
IGNITE-11867 Fix flaky test GridCacheRebalancingWithAsyncClearingTest#testCorrectRebalancingCurrentlyRentingPartitions
Signed-off-by: Ivan Rakov <ir...@apache.org>
---
.../dht/preloader/GridDhtPartitionDemander.java | 320 +++++++++++----------
.../preloader/GridDhtPartitionsExchangeFuture.java | 19 +-
.../dht/preloader/GridDhtPreloader.java | 6 +-
.../dht/topology/GridDhtLocalPartition.java | 45 ++-
.../dht/topology/GridDhtPartitionTopologyImpl.java | 91 ++++--
.../dht/topology/PartitionsEvictManager.java | 4 +
.../GridCacheDatabaseSharedManager.java | 8 +-
.../cache/persistence/GridCacheOffheapManager.java | 2 +-
.../cache/persistence/file/FilePageStore.java | 9 +-
.../ignite/failure/IoomFailureHandlerTest.java | 7 +-
.../distributed/CacheRentingStateRepairTest.java | 138 ++++++++-
...sticOriginatingNodeFailureAbstractSelfTest.java | 4 +
...dCacheRebalancingWithAsyncClearingMvccTest.java | 5 +
.../GridCacheRebalancingWithAsyncClearingTest.java | 13 +-
...eRebalanceOnCachesStoppingOrDestroyingTest.java | 4 +-
.../junits/common/GridCommonAbstractTest.java | 12 +-
.../ignite/testsuites/IgnitePdsMvccTestSuite3.java | 6 +-
.../ignite/testsuites/IgnitePdsMvccTestSuite4.java | 4 +
18 files changed, 467 insertions(+), 230 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 1c17a6f..0cf5aba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheRebalanceMode;
@@ -319,9 +320,8 @@ public class GridDhtPartitionDemander {
metrics.clearRebalanceCounters();
for (GridDhtPartitionDemandMessage msg : assignments.values()) {
- for (Integer partId : msg.partitions().fullSet()) {
+ for (Integer partId : msg.partitions().fullSet())
metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId));
- }
CachePartitionPartialCountersMap histMap = msg.partitions().historicalMap();
@@ -523,6 +523,7 @@ public class GridDhtPartitionDemander {
if (log.isInfoEnabled())
log.info("Started rebalance routine [" + grp.cacheOrGroupName() +
+ ", topVer=" + fut.topologyVersion() +
", supplier=" + node.id() + ", topic=" + topicId +
", fullPartitions=" + S.compact(stripePartitions.get(topicId).fullSet()) +
", histPartitions=" + S.compact(stripePartitions.get(topicId).historicalSet()) + "]");
@@ -667,195 +668,204 @@ public class GridDhtPartitionDemander {
final RebalanceFuture fut = rebalanceFut;
- ClusterNode node = ctx.node(nodeId);
+ try {
+ fut.cancelLock.readLock().lock();
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
+ ClusterNode node = ctx.node(nodeId);
- return;
- }
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
- // Topology already changed (for the future that supply message based on).
- if (topologyChanged(fut) || !fut.isActual(supplyMsg.rebalanceId())) {
- if (log.isDebugEnabled())
- log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
+ return;
+ }
- return;
- }
+ // Topology already changed (for the future that supply message based on).
+ if (topologyChanged(fut) || !fut.isActual(supplyMsg.rebalanceId())) {
+ if (log.isDebugEnabled())
+ log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
- if (log.isDebugEnabled())
- log.debug("Received supply message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
+ return;
+ }
- // Check whether there were error during supply message unmarshalling process.
- if (supplyMsg.classError() != null) {
- U.warn(log, "Rebalancing from node cancelled [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" +
- ". Supply message couldn't be unmarshalled: " + supplyMsg.classError());
+ if (log.isDebugEnabled())
+ log.debug("Received supply message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
- fut.cancel(nodeId);
+ // Check whether there were error during supply message unmarshalling process.
+ if (supplyMsg.classError() != null) {
+ U.warn(log, "Rebalancing from node cancelled [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" +
+ ". Supply message couldn't be unmarshalled: " + supplyMsg.classError());
- return;
- }
+ fut.cancel(nodeId);
- // Check whether there were error during supplying process.
- if (supplyMsg.error() != null) {
- U.warn(log, "Rebalancing from node cancelled [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" +
- "]. Supplier has failed with error: " + supplyMsg.error());
+ return;
+ }
- fut.cancel(nodeId);
+ // Check whether there were error during supplying process.
+ if (supplyMsg.error() != null) {
+ U.warn(log, "Rebalancing from node cancelled [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" +
+ "]. Supplier has failed with error: " + supplyMsg.error());
- return;
- }
+ fut.cancel(nodeId);
- final GridDhtPartitionTopology top = grp.topology();
+ return;
+ }
- if (grp.sharedGroup()) {
- for (GridCacheContext cctx : grp.caches()) {
- if (cctx.statisticsEnabled()) {
- long keysCnt = supplyMsg.keysForCache(cctx.cacheId());
+ final GridDhtPartitionTopology top = grp.topology();
- if (keysCnt != -1)
- cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
+ if (grp.sharedGroup()) {
+ for (GridCacheContext cctx : grp.caches()) {
+ if (cctx.statisticsEnabled()) {
+ long keysCnt = supplyMsg.keysForCache(cctx.cacheId());
- // Can not be calculated per cache.
- cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
+ if (keysCnt != -1)
+ cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
+
+ // Can not be calculated per cache.
+ cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
+ }
}
}
- }
- else {
- GridCacheContext cctx = grp.singleCacheContext();
+ else {
+ GridCacheContext cctx = grp.singleCacheContext();
- if (cctx.statisticsEnabled()) {
- if (supplyMsg.estimatedKeysCount() != -1)
- cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supplyMsg.estimatedKeysCount());
+ if (cctx.statisticsEnabled()) {
+ if (supplyMsg.estimatedKeysCount() != -1)
+ cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supplyMsg.estimatedKeysCount());
- cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
+ cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
+ }
}
- }
- try {
- AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+ try {
+ AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
- // Preload.
- for (Map.Entry<Integer, CacheEntryInfoCollection> e : supplyMsg.infos().entrySet()) {
- int p = e.getKey();
+ // Preload.
+ for (Map.Entry<Integer, CacheEntryInfoCollection> e : supplyMsg.infos().entrySet()) {
+ int p = e.getKey();
- if (aff.get(p).contains(ctx.localNode())) {
- GridDhtLocalPartition part;
+ if (aff.get(p).contains(ctx.localNode())) {
+ GridDhtLocalPartition part;
- try {
- part = top.localPartition(p, topVer, true);
- }
- catch (GridDhtInvalidPartitionException err) {
- assert !topVer.equals(top.lastTopologyChangeVersion());
-
- if (log.isDebugEnabled()) {
- log.debug("Failed to get partition for rebalancing [" +
- "grp=" + grp.cacheOrGroupName() +
- ", err=" + err +
- ", p=" + p +
- ", topVer=" + topVer +
- ", lastTopVer=" + top.lastTopologyChangeVersion() + ']');
+ try {
+ part = top.localPartition(p, topVer, true);
}
+ catch (GridDhtInvalidPartitionException err) {
+ assert !topVer.equals(top.lastTopologyChangeVersion());
+
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to get partition for rebalancing [" +
+ "grp=" + grp.cacheOrGroupName() +
+ ", err=" + err +
+ ", p=" + p +
+ ", topVer=" + topVer +
+ ", lastTopVer=" + top.lastTopologyChangeVersion() + ']');
+ }
- continue;
- }
+ continue;
+ }
- assert part != null;
+ assert part != null;
- boolean last = supplyMsg.last().containsKey(p);
+ boolean last = supplyMsg.last().containsKey(p);
- if (part.state() == MOVING) {
- boolean reserved = part.reserve();
+ if (part.state() == MOVING) {
+ boolean reserved = part.reserve();
- assert reserved : "Failed to reserve partition [igniteInstanceName=" +
- ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']';
+ assert reserved : "Failed to reserve partition [igniteInstanceName=" +
+ ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']';
- part.lock();
+ part.lock();
- try {
- Iterator<GridCacheEntryInfo> infos = e.getValue().infos().iterator();
+ part.beforeApplyBatch(last);
- if (grp.mvccEnabled())
- mvccPreloadEntries(topVer, node, p, infos);
- else
- preloadEntries(topVer, node, p, infos);
+ try {
+ Iterator<GridCacheEntryInfo> infos = e.getValue().infos().iterator();
+
+ if (grp.mvccEnabled())
+ mvccPreloadEntries(topVer, node, p, infos);
+ else
+ preloadEntries(topVer, node, p, infos);
- // If message was last for this partition,
- // then we take ownership.
- if (last) {
- fut.partitionDone(nodeId, p, true);
+ // If message was last for this partition,
+ // then we take ownership.
+ if (last) {
+ fut.partitionDone(nodeId, p, true);
- if (log.isDebugEnabled())
- log.debug("Finished rebalancing partition: " +
- "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]");
+ if (log.isDebugEnabled())
+ log.debug("Finished rebalancing partition: " +
+ "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]");
+ }
+ }
+ finally {
+ part.unlock();
+ part.release();
}
}
- finally {
- part.unlock();
- part.release();
+ else {
+ if (last)
+ fut.partitionDone(nodeId, p, false);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (state is not MOVING): " +
+ "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]");
}
}
else {
- if (last)
- fut.partitionDone(nodeId, p, false);
+ fut.partitionDone(nodeId, p, false);
if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (state is not MOVING): " +
+ log.debug("Skipping rebalancing partition (affinity changed): " +
"[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]");
}
}
- else {
- fut.partitionDone(nodeId, p, false);
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (affinity changed): " +
- "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]");
+ // Only request partitions based on latest topology version.
+ for (Integer miss : supplyMsg.missed()) {
+ if (aff.get(miss).contains(ctx.localNode()))
+ fut.partitionMissed(nodeId, miss);
}
- }
- // Only request partitions based on latest topology version.
- for (Integer miss : supplyMsg.missed()) {
- if (aff.get(miss).contains(ctx.localNode()))
- fut.partitionMissed(nodeId, miss);
- }
+ for (Integer miss : supplyMsg.missed())
+ fut.partitionDone(nodeId, miss, false);
- for (Integer miss : supplyMsg.missed())
- fut.partitionDone(nodeId, miss, false);
+ GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+ supplyMsg.rebalanceId(),
+ supplyMsg.topologyVersion(),
+ grp.groupId());
- GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
- supplyMsg.rebalanceId(),
- supplyMsg.topologyVersion(),
- grp.groupId());
+ d.timeout(grp.preloader().timeout());
- d.timeout(grp.preloader().timeout());
-
- d.topic(rebalanceTopics.get(topicId));
+ d.topic(rebalanceTopics.get(topicId));
- if (!topologyChanged(fut) && !fut.isDone()) {
- // Send demand message.
- try {
- ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId),
- d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.preloader().timeout());
+ if (!topologyChanged(fut) && !fut.isDone()) {
+ // Send demand message.
+ try {
+ ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId),
+ d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.preloader().timeout());
- if (log.isDebugEnabled())
- log.debug("Send next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
+ if (log.isDebugEnabled())
+ log.debug("Send next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Supplier has left [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
+ ", errMsg=" + e.getMessage() + ']');
+ }
}
- catch (ClusterTopologyCheckedException e) {
+ else {
if (log.isDebugEnabled())
- log.debug("Supplier has left [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
- ", errMsg=" + e.getMessage() + ']');
+ log.debug("Will not request next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
+ ", topChanged=" + topologyChanged(fut) + ", rebalanceFuture=" + fut + "]");
}
}
- else {
- if (log.isDebugEnabled())
- log.debug("Will not request next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
- ", topChanged=" + topologyChanged(fut) + ", rebalanceFuture=" + fut + "]");
+ catch (IgniteSpiException | IgniteCheckedException e) {
+ LT.error(log, e, "Error during rebalancing [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
+ ", err=" + e + ']');
}
}
- catch (IgniteSpiException | IgniteCheckedException e) {
- LT.error(log, e, "Error during rebalancing [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
- ", err=" + e + ']');
+ finally {
+ fut.cancelLock.readLock().unlock();
}
}
@@ -1207,11 +1217,17 @@ public class GridDhtPartitionDemander {
/** The number of rebalance routines. */
private final long routines;
+ /** Used to order rebalance cancellation and supply message processing, they should not overlap.
+ * Otherwise partition clearing could start on still rebalancing partition resulting in eviction of
+ * partition in OWNING state. */
+ private final ReentrantReadWriteLock cancelLock;
+
/**
* @param grp Cache group.
* @param assignments Assignments.
* @param log Logger.
- * @param rebalanceId Rebalance id.
+ *
+ @param rebalanceId Rebalance id.
*/
RebalanceFuture(
CacheGroupContext grp,
@@ -1237,6 +1253,8 @@ public class GridDhtPartitionDemander {
this.rebalanceId = rebalanceId;
ctx = grp.shared();
+
+ cancelLock = new ReentrantReadWriteLock();
}
/**
@@ -1250,6 +1268,7 @@ public class GridDhtPartitionDemander {
this.log = null;
this.rebalanceId = -1;
this.routines = 0;
+ this.cancelLock = new ReentrantReadWriteLock();
}
/**
@@ -1280,24 +1299,33 @@ public class GridDhtPartitionDemander {
* @return {@code True}.
*/
@Override public boolean cancel() {
- synchronized (this) {
- if (isDone())
- return true;
+ try {
+ // Cancel lock is needed only for case when some message might be on the fly while rebalancing is
+ // cancelled.
+ cancelLock.writeLock().lock();
- U.log(log, "Cancelled rebalancing from all nodes [grp=" + grp.cacheOrGroupName() +
- ", topVer=" + topologyVersion() + "]");
+ synchronized (this) {
+ if (isDone())
+ return true;
- if (!ctx.kernalContext().isStopping()) {
- for (UUID nodeId : remaining.keySet())
- cleanupRemoteContexts(nodeId);
- }
+ U.log(log, "Cancelled rebalancing from all nodes [grp=" + grp.cacheOrGroupName() +
+ ", topVer=" + topologyVersion() + "]");
- remaining.clear();
+ if (!ctx.kernalContext().isStopping()) {
+ for (UUID nodeId : remaining.keySet())
+ cleanupRemoteContexts(nodeId);
+ }
- checkIsDone(true /* cancelled */);
- }
+ remaining.clear();
- return true;
+ checkIsDone(true /* cancelled */);
+ }
+
+ return true;
+ }
+ finally {
+ cancelLock.writeLock().unlock();
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 319e6b8..103cf47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -352,8 +352,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** Discovery lag / Clocks discrepancy, calculated on coordinator when all single messages are received. */
private T2<Long, UUID> discoveryLag;
- /** TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11799 */
- private Map<Integer, Set<Integer>> clearingPartitions;
+ /** Partitions scheduled for historical reblanace for this topology version. */
+ private Map<Integer, Set<Integer>> histPartitions;
/**
* @param cctx Cache context.
@@ -1437,7 +1437,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.exchange().exchangerBlockingSectionEnd();
}
- clearingPartitions = new HashMap();
+ histPartitions = new HashMap();
timeBag.finishGlobalStage("WAL history reservation");
@@ -5046,21 +5046,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * If partition is clearing or already cleared we need full rebalance even if supplier is exists.
- * (it still could be used by other demanders)
- *
* @param grp Group.
* @param part Partition.
*/
- public boolean isClearingPartition(CacheGroupContext grp, int part) {
+ public boolean isHistoryPartition(CacheGroupContext grp, int part) {
if (!grp.persistenceEnabled())
return false;
synchronized (mux) {
- if (clearingPartitions == null)
+ if (histPartitions == null)
return false;
- Set<Integer> parts = clearingPartitions.get(grp.groupId());
+ Set<Integer> parts = histPartitions.get(grp.groupId());
return parts != null && parts.contains(part);
}
@@ -5070,12 +5067,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param grp Group.
* @param part Partition.
*/
- public void addClearingPartition(CacheGroupContext grp, int part) {
+ public void addHistoryPartition(CacheGroupContext grp, int part) {
if (!grp.persistenceEnabled())
return;
synchronized (mux) {
- clearingPartitions.computeIfAbsent(grp.groupId(), k -> new HashSet()).add(part);
+ histPartitions.computeIfAbsent(grp.groupId(), k -> new HashSet()).add(part);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a40d08d..b57e062 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -255,9 +255,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (part.reserve()) {
part.moving();
- if (exchFut != null)
- exchFut.addClearingPartition(grp, part.id());
-
part.clearAsync();
part.release();
@@ -282,8 +279,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
histSupplier = ctx.discovery().node(nodeId);
}
- // Clearing partition should always be fully reloaded.
- if (histSupplier != null && !exchFut.isClearingPartition(grp, p)) {
+ if (histSupplier != null && exchFut.isHistoryPartition(grp, p)) {
assert grp.persistenceEnabled();
assert remoteOwners(p, topVer).contains(histSupplier) : remoteOwners(p, topVer);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index a131a21..5e637e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -379,9 +380,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
// Make sure to remove exactly this entry.
removeEntry(entry);
-
- // Attempt to evict.
- tryContinueClearing();
}
/**
@@ -518,14 +516,12 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
// Decrement reservations.
if (this.state.compareAndSet(state, newState)) {
- // If no more reservations try to continue delayed renting or clearing process.
- if (reservations == 0) {
- if (delayedRenting)
- rent(true);
- else
- tryContinueClearing();
- }
+ // If no more reservations try to continue delayed renting.
+ if (reservations == 0 && delayedRenting)
+ rent(true);
+ // Partition could be only reserved in OWNING state so no further actions
+ // are required.
break;
}
}
@@ -697,6 +693,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @param updateSeq Update sequence.
*/
private void clearAsync0(boolean updateSeq) {
+ // Method expected to be called from exchange worker or rebalancing thread when rebalancing is done.
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
@@ -747,7 +744,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
clear = true;
- clearAsync0(false);
+ GridDhtPartitionDemander.RebalanceFuture rebFut =
+ (GridDhtPartitionDemander.RebalanceFuture)grp.preloader().rebalanceFuture();
+
+ // Make sure current rebalance future finishes before clearing
+ // to avoid clearing currently rebalancing partition.
+ // NOTE: this invariant is not true for initial rebalance future.
+ if (rebFut.topologyVersion() != null && state0 == MOVING && !rebFut.isDone())
+ rebFut.listen(fut -> clearAsync0(false));
+ else
+ clearAsync0(false);
}
/**
@@ -907,6 +913,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* Only one thread is allowed to do such process concurrently.
* At the end of clearing method completes {@code clearFuture}.
*
+ * @param evictionCtx Eviction context.
+ *
* @return {@code false} if clearing is not started due to existing reservations.
* @throws NodeStoppingException If node is stopping.
*/
@@ -961,7 +969,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* Tries to continue delayed partition clearing.
*/
public void onUnlock() {
- tryContinueClearing();
+ // No-op.
}
/**
@@ -1144,7 +1152,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
CacheDataRow row = it0.next();
// Do not clear fresh rows in case of partition reloading.
- // This is required because updates are possible to moving partition which is currently cleared.
+ // This is required because normal updates are possible to moving partition which is currently cleared.
if (row.version().compareTo(clearVer) >= 0 && (state() == MOVING && clear))
continue;
@@ -1451,6 +1459,15 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
+ * Called before next batch is about to be applied during rebalance. Currently used for tests.
+ *
+ * @param last {@code True} if last batch for partition.
+ */
+ public void beforeApplyBatch(boolean last) {
+ // No-op.
+ }
+
+ /**
* Removed entry holder.
*/
private static class RemovedEntryHolder {
@@ -1598,7 +1615,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
public void finish() {
synchronized (this) {
onDone();
- finished = true;
+ finished = true; // Marks state when all future listeners are finished.
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 012c30b..a831e78 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -152,6 +152,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** */
private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
+ /** Factory used for re-creating partition during it's lifecycle. */
+ private PartitionFactory partFactory;
+
/**
* @param ctx Cache shared context.
* @param grp Cache group.
@@ -173,6 +176,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions());
cntrMap = new CachePartitionFullCountersMap(locParts.length());
+
+ partFactory = (ctx1, grp1, id) -> new GridDhtLocalPartition(ctx1, grp1, id, false);
+ }
+
+ /**
+ * Set partition factory to use. Currently is used for tests.
+ *
+ * @param factory Factory.
+ */
+ public void partitionFactory(PartitionFactory factory) {
+ this.partFactory = factory;
}
/** {@inheritDoc} */
@@ -494,15 +508,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
for (int p = 0; p < partitions; p++) {
if (node2part != null && node2part.valid()) {
if (localNode(p, aff)) {
- // This will make sure that all non-existing partitions
- // will be created in MOVING state.
- boolean existing = locParts.get(p) != null;
-
GridDhtLocalPartition locPart = getOrCreatePartition(p);
- if (existing && locPart.state() == MOVING && !locPart.isEmpty())
- exchFut.addClearingPartition(grp, p);
-
updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer);
}
}
@@ -781,19 +788,26 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", owners = " + owners + ']');
}
- // If partition was not still cleared yet start clearing if needed.
- // Important: avoid calling clearAsync multiple times in the same rebalance session
- // or bad things may happen depending on timing.
- if (exchFut.isClearingPartition(grp, p) && !locPart.isClearing() && !locPart.isEmpty())
- locPart.clearAsync();
+ // It's important to clear non empty moving partitions before full rebalancing.
+ // Consider the scenario:
+ // Node1 has keys k1 and k2 in the same partition.
+ // Node2 started rebalancing from Node1.
+ // Node2 received k1, k2 and failed before moving partition to OWNING state.
+ // Node1 removes k2 but update has not been delivered to Node1 because of failure.
+ // After new full rebalance Node1 will only send k1 to Node2 causing lost removal.
+ // NOTE: avoid calling clearAsync for partition twice per topology version.
+ // TODO FIXME clearing is not always needed see IGNITE-11799
+ if (grp.persistenceEnabled() && !exchFut.isHistoryPartition(grp, locPart.id()) &&
+ !locPart.isClearing() && !locPart.isEmpty() && !grp.mvccEnabled())
+ locPart.clearAsync();
+ }
+ else
+ updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
}
- else
- updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
}
- }
- else {
- if (locPart != null) {
- GridDhtPartitionState state = locPart.state();
+ else {
+ if (locPart != null) {
+ GridDhtPartitionState state = locPart.state();
if (state == MOVING) {
locPart.rent(false);
@@ -869,7 +883,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (loc != null)
loc.awaitDestroy();
- locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, false));
+ locParts.set(p, loc = partFactory.create(ctx, grp, p));
long updCntr = cntrMap.updateCounter(p);
@@ -897,8 +911,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
GridDhtLocalPartition part = locParts.get(p);
- if (part != null && part.state() != EVICTED)
- return part;
+ if (part != null) {
+ if (part.state() != EVICTED)
+ return part;
+ else
+ part.awaitDestroy();
+ }
part = new GridDhtLocalPartition(ctx, grp, p, true);
@@ -977,7 +995,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer +
", this.topVer=" + this.readyTopVer + ']');
- locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, false));
+ locParts.set(p, loc = partFactory.create(ctx, grp, p));
this.updateSeq.incrementAndGet();
@@ -2312,8 +2330,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* Prevents ongoing renting if required.
*
* @param p Partition id.
- * @param clear If {@code true} partition have to be cleared before rebalance.
- * Required in case of full state transfer to handle removals on supplier.
+ * @param clear If {@code true} partition have to be cleared before rebalance (full rebalance or rebalance restart
+ * after cancellation).
* @param exchFut Future related to partition state change.
*/
private GridDhtLocalPartition rebalancePartition(int p, boolean clear, GridDhtPartitionsExchangeFuture exchFut) {
@@ -2335,11 +2353,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part.state() != MOVING)
part.moving();
- if (clear) {
- exchFut.addClearingPartition(grp, part.id());
-
- part.clearAsync();
- }
+ if (!clear)
+ exchFut.addHistoryPartition(grp, part.id());
assert part.state() == MOVING : part;
@@ -2451,7 +2466,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Partitions have been scheduled to resend [reason=" +
- "Evictions are done [grp" + grp.cacheOrGroupName() + "]");
+ "Evictions are done [grp=" + grp.cacheOrGroupName() + "]");
ctx.exchange().scheduleResendPartitions();
}
@@ -3103,4 +3118,20 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
throw new UnsupportedOperationException("remove");
}
}
+
+ /**
+ * Partition factory used for (re-)creating partitions during their lifecycle.
+ * Currently used in tests for overriding default partition behavior.
+ */
+ public interface PartitionFactory {
+ /**
+ * @param ctx Context.
+ * @param grp Group.
+ * @param id Partition id.
+ * @return New partition instance.
+ */
+ public GridDhtLocalPartition create(GridCacheSharedContext ctx,
+ CacheGroupContext grp,
+ int id);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
index c458d3e..b294843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
@@ -412,8 +412,12 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
}
try {
+ assert part.state() != GridDhtPartitionState.OWNING : part;
+
boolean success = part.tryClear(grpEvictionCtx);
+ assert part.state() != GridDhtPartitionState.OWNING : part;
+
if (success) {
if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy())
part.destroy();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 935f403..1975c0f 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2401,8 +2401,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
log.info("Finished applying memory changes [changesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + " ms]");
- assert applied.get() > 0;
-
finalizeCheckpointOnRecovery(status.cpStartTs, status.cpStartId, status.startPtr, exec);
}
@@ -2767,7 +2765,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
int partId = cacheState.partitionByIndex(i);
byte state = cacheState.stateByIndex(i);
- partitionRecoveryStates.put(new GroupPartitionId(entry.getKey(), partId), (int)state);
+ // Ignore undefined state.
+ if (state != -1) {
+ partitionRecoveryStates.put(new GroupPartitionId(entry.getKey(), partId),
+ (int)state);
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index c5045ba..8207b2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -559,7 +559,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
ctx.database().checkpointReadUnlock();
}
}
- else if (recoverState != null && recoverState >= 0) { // Pre-create partition if having valid state.
+ else if (recoverState != null) { // Pre-create partition if having valid state.
GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
updateState(part, recoverState);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index adaf118..8e8494b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -302,8 +302,15 @@ public class FilePageStore implements PageStore {
lock.writeLock().lock();
try {
- if (!inited)
+ if (!inited) {
+ if (fileIO != null) // Ensure the file is closed even if not initialized yet.
+ fileIO.close();
+
+ if (delete && exists())
+ Files.delete(pathProvider.apply().toAbsolutePath());
+
return;
+ }
fileIO.force();
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/IoomFailureHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/failure/IoomFailureHandlerTest.java
index 9f95f41..402c1a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/IoomFailureHandlerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/IoomFailureHandlerTest.java
@@ -61,8 +61,13 @@ public class IoomFailureHandlerTest extends AbstractFailureHandlerTest {
dfltPlcCfg.setInitialSize(SIZE);
dfltPlcCfg.setMaxSize(SIZE);
- if (pds)
+ if (pds) {
+ // We need longer failure detection timeout for PDS enabled mode or checkpoint write lock can block tx
+ // checkpoint read lock for too long causing FH triggering on slow hardware.
+ cfg.setFailureDetectionTimeout(30_000);
+
dfltPlcCfg.setPersistenceEnabled(true);
+ }
dsCfg.setDefaultDataRegionConfiguration(dfltPlcCfg);
dsCfg.setPageSize(PAGE_SIZE);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java
index 87402b6..77e72bb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java
@@ -28,10 +28,14 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
@@ -40,16 +44,21 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJU
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
- *
+ * Contains several test scenarios related to partition state transitions during it's lifecycle.
*/
public class CacheRentingStateRepairTest extends GridCommonAbstractTest {
/** */
public static final int PARTS = 1024;
+ /** */
+ private static final String CLIENT = "client";
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ cfg.setClientMode(CLIENT.equals(igniteInstanceName));
+
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setAffinity(new RendezvousAffinityFunction(false, PARTS).setPartitions(64));
@@ -73,6 +82,7 @@ public class CacheRentingStateRepairTest extends GridCommonAbstractTest {
DataStorageConfiguration memCfg = new DataStorageConfiguration().setPageSize(1024)
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setPersistenceEnabled(true).setInitialSize(sz).setMaxSize(sz))
+ .setWalSegmentSize(8 * 1024 * 1024)
.setWalMode(WALMode.LOG_ONLY).setCheckpointFrequency(24L * 60 * 60 * 1000);
cfg.setDataStorageConfiguration(memCfg);
@@ -100,7 +110,7 @@ public class CacheRentingStateRepairTest extends GridCommonAbstractTest {
}
/**
- *
+ * Tests partition is properly evicted when node is restarted in the middle of the eviction.
*/
@Test
public void testRentingStateRepairAfterRestart() throws Exception {
@@ -194,6 +204,130 @@ public class CacheRentingStateRepairTest extends GridCommonAbstractTest {
}
}
+ /**
+ * Tests the partition is not cleared when rebalanced.
+ */
+ @Test
+ public void testRebalanceRentingPartitionAndServerNodeJoin() throws Exception {
+ testRebalanceRentingPartitionAndNodeJoin(false, 0);
+ }
+
+ /**
+ * Tests the partition is not cleared when rebalanced.
+ */
+ @Test
+ public void testRebalanceRentingPartitionAndClientNodeJoin() throws Exception {
+ testRebalanceRentingPartitionAndNodeJoin(true, 0);
+ }
+
+ /**
+ * Tests the partition is not cleared when rebalanced.
+ */
+ @Test
+ public void testRebalanceRentingPartitionAndServerNodeJoinWithDelay() throws Exception {
+ testRebalanceRentingPartitionAndNodeJoin(false, 5_000);
+ }
+
+ /**
+ * Tests the partition is not cleared when rebalanced.
+ */
+ @Test
+ public void testRebalanceRentingPartitionAndClientNodeJoinWithDelay() throws Exception {
+ testRebalanceRentingPartitionAndNodeJoin(true, 5_000);
+ }
+
+ /**
+ * @param client {@code True} for client node join.
+ * @param delay Delay.
+ *
+ * @throws Exception if failed.
+ */
+ private void testRebalanceRentingPartitionAndNodeJoin(boolean client, long delay) throws Exception {
+ try {
+ IgniteEx g0 = startGrids(2);
+
+ g0.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ List<Integer> parts = evictingPartitionsAfterJoin(g0, g0.cache(DEFAULT_CACHE_NAME), 20);
+
+ int delayEvictPart = parts.get(0);
+
+ List<Integer> keys = partitionKeys(g0.cache(DEFAULT_CACHE_NAME), delayEvictPart, 2_000, 0);
+
+ for (Integer key : keys)
+ g0.cache(DEFAULT_CACHE_NAME).put(key, key);
+
+ GridDhtPartitionTopologyImpl top = (GridDhtPartitionTopologyImpl)dht(g0.cache(DEFAULT_CACHE_NAME)).topology();
+
+ GridDhtLocalPartition part = top.localPartition(delayEvictPart);
+
+ assertNotNull(part);
+
+ // Prevent eviction.
+ part.reserve();
+
+ startGrid(2);
+
+ resetBaselineTopology();
+
+ part.release();
+
+ part.rent(false).get();
+
+ CountDownLatch l1 = new CountDownLatch(1);
+ CountDownLatch l2 = new CountDownLatch(1);
+
+ // Create race between processing of final supply message and partition clearing.
+ top.partitionFactory((ctx, grp, id) -> id != delayEvictPart ? new GridDhtLocalPartition(ctx, grp, id, false) :
+ new GridDhtLocalPartition(ctx, grp, id, false) {
+ @Override public void beforeApplyBatch(boolean last) {
+ if (last) {
+ l1.countDown();
+
+ U.awaitQuiet(l2);
+
+ if (delay > 0) // Delay rebalance finish to enforce race with clearing.
+ doSleep(delay);
+ }
+ }
+ });
+
+ stopGrid(2);
+
+ resetBaselineTopology(); // Trigger rebalance for delayEvictPart after eviction.
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ l1.await();
+
+ // Trigger partition clear on next topology version.
+ if (client)
+ startGrid(CLIENT);
+ else
+ startGrid(2);
+
+ l2.countDown(); // Finish partition rebalance after initiating clear.
+ }
+ catch (Exception e) {
+ fail(X.getFullStackTrace(e));
+ }
+ }
+ }, 1);
+
+ fut.get();
+
+ awaitPartitionMapExchange(true, true, null, true);
+
+ assertPartitionsSame(idleVerify(g0));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
cleanPersistenceDir();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
index 0399a77..8186385 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
@@ -289,6 +289,8 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
}
}
+ awaitPartitionMapExchange();
+
for (Map.Entry<Integer, String> e : map.entrySet()) {
long cntr0 = -1;
@@ -453,6 +455,8 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
}
}
+ awaitPartitionMapExchange();
+
for (Map.Entry<Integer, String> e : map.entrySet()) {
long cntr0 = -1;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingMvccTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingMvccTest.java
index a8a9b4a..cadd25d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingMvccTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingMvccTest.java
@@ -26,4 +26,9 @@ public class GridCacheRebalancingWithAsyncClearingMvccTest extends GridCacheReba
@Override protected CacheAtomicityMode atomicityMode() {
return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
}
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return super.getTestTimeout() * 2; // Parent test generates a lot of data and is inherently slow in mvcc mode.
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java
index 8a6a962..db01b09 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java
@@ -65,7 +65,7 @@ public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstrac
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
- .setMaxSize(100L * 1024 * 1024))
+ .setMaxSize(300L * 1024 * 1024))
);
cfg.setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME)
@@ -128,7 +128,7 @@ public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstrac
public void testPartitionClearingNotBlockExchange() throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, "1");
- IgniteEx ig = (IgniteEx) startGrids(3);
+ IgniteEx ig = startGrids(3);
ig.cluster().active(true);
// High number of keys triggers long partition eviction.
@@ -223,7 +223,7 @@ public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstrac
*/
@Test
public void testCorrectRebalancingCurrentlyRentingPartitions() throws Exception {
- IgniteEx ignite = (IgniteEx) startGrids(3);
+ IgniteEx ignite = startGrids(3);
ignite.cluster().active(true);
// High number of keys triggers long partition eviction.
@@ -232,11 +232,10 @@ public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstrac
try (IgniteDataStreamer<Integer, Integer> ds = ignite.dataStreamer(CACHE_NAME)) {
log.info("Writing initial data...");
- ds.allowOverwrite(true);
for (int k = 1; k <= keysCnt; k++) {
ds.addData(k, k);
- if (k % 10_000 == 0)
+ if (k % 50_000 == 0)
log.info("Written " + k + " entities.");
}
@@ -259,14 +258,12 @@ public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstrac
// Started node should have partition in RENTING or EVICTED state.
startGrid(1);
- awaitPartitionMapExchange();
+ awaitPartitionMapExchange(true, true, null, true);
// Check no data loss.
for (int k = 1; k <= keysCnt; k++) {
Integer val = (Integer) ignite.cache(CACHE_NAME).get(k);
-
Assert.assertNotNull("Value for " + k + " is null", val);
-
Assert.assertEquals("Check failed for " + k + " = " + val, k, (int)val);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
index f3eb906..286de48 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -257,6 +258,7 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
.setRebalanceBatchSize(REBALANCE_BATCH_SIZE)
.setCacheMode(CacheMode.REPLICATED)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setAffinity(new RendezvousAffinityFunction(false, 64))
).collect(Collectors.toList());
ig.getOrCreateCaches(configs);
@@ -264,7 +266,7 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
configs.forEach(cfg -> {
try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(cfg.getName())) {
for (int i = 0; i < KEYS_SIZE; i++)
- streamer.addData(i, new byte[1024]);
+ streamer.addData(i, i);
streamer.flush();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index f2c6239..638e469 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -89,7 +89,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
@@ -130,6 +129,7 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -704,7 +704,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
!affNodes.get(0).equals(dht.context().affinity().primaryByPartition(p, readyVer));
if (affNodesCnt != ownerNodesCnt || !affNodes.containsAll(owners) ||
- (waitEvicts && loc != null && loc.state() != GridDhtPartitionState.OWNING) ||
+ (waitEvicts && loc != null && loc.state() != OWNING) ||
notPrimary) {
if (i % 50 == 0)
LT.warn(log(), "Waiting for topology map update [" +
@@ -794,7 +794,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
", locNode=" + g.cluster().localNode() + ']');
}
- if (entry.getValue() != GridDhtPartitionState.OWNING) {
+ if (entry.getValue() != OWNING) {
LT.warn(log(),
"Waiting for correct partition state part=" + entry.getKey()
+ ", should be OWNING [state=" + entry.getValue() + "], node=" +
@@ -2263,7 +2263,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
@Nullable GridDhtLocalPartition locPart =
internalCache(grid(gridName).cache(DEFAULT_CACHE_NAME)).context().topology().localPartition(partId);
- return locPart == null ? null : locPart.dataStore().partUpdateCounter();
+ return locPart == null || locPart.state() != OWNING ? null : locPart.dataStore().partUpdateCounter();
}
/**
@@ -2277,7 +2277,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
@Nullable GridDhtLocalPartition locPart =
internalCache(grid(gridName).cache(cacheName)).context().topology().localPartition(partId);
- return locPart == null ? null : locPart.dataStore().partUpdateCounter();
+ return locPart == null || locPart.state() != OWNING ? null : locPart.dataStore().partUpdateCounter();
}
/**
@@ -2308,7 +2308,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
PartitionUpdateCounter cntr = counter(partId, ignite.name());
if (cntr0 != null) {
- assertEquals("Expecting same counters", cntr0, cntr);
+ assertEquals("Expecting same counters [partId=" + partId + ']', cntr0, cntr);
if (withReserveCntr)
assertEquals("Expecting same reservation counters", cntr0.reserved(), cntr.reserved());
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite3.java
index e9792db..19c2fb9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite3.java
@@ -19,6 +19,8 @@ package org.apache.ignite.testsuites;
import java.util.HashSet;
import java.util.List;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithExpiryPolicy;
import org.apache.ignite.testframework.junits.DynamicSuite;
import org.junit.runner.RunWith;
@@ -35,7 +37,9 @@ public class IgnitePdsMvccTestSuite3 {
HashSet<Class> ignoredTests = new HashSet<>();
- // No ignored tests yet.
+ // TODO https://issues.apache.org/jira/browse/IGNITE-11937
+ ignoredTests.add(IgnitePdsContinuousRestartTest.class);
+ ignoredTests.add(IgnitePdsContinuousRestartTestWithExpiryPolicy.class);
return IgnitePdsTestSuite3.suite(ignoredTests);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
index 78c9480..b7b36d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
@@ -20,6 +20,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPartitionPreloadTest;
import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.PageLockTrackerManagerTest;
@@ -53,6 +54,9 @@ public class IgnitePdsMvccTestSuite4 {
ignoredTests.add(FileDownloaderTest.class);
ignoredTests.add(IgnitePdsTaskCancelingTest.class);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-11937
+ ignoredTests.add(IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes.class);
+
// Skip page lock tracker tests for MVCC suite.
ignoredTests.add(PageLockTrackerManagerTest.class);
ignoredTests.add(SharedPageLockTrackerTest.class);