You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/05/15 10:44:26 UTC
[ignite] branch master updated: IGNITE-11147 Rebalancing is not
cancelled if topologies are compatible. - Fixes #7428.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a6d3f52 IGNITE-11147 Rebalancing is not cancelled if topologies are compatible. - Fixes #7428.
a6d3f52 is described below
commit a6d3f52e41dfe602a3654cc67c59f460fd0dfd4e
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri May 15 13:35:13 2020 +0300
IGNITE-11147 Rebalancing is not cancelled if topologies are compatible. - Fixes #7428.
Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
.../org/apache/ignite/IgniteSystemProperties.java | 9 +
.../affinity/GridAffinityAssignmentCache.java | 4 +-
.../cache/GridCachePartitionExchangeManager.java | 108 +--
.../processors/cache/GridCachePreloader.java | 16 +-
.../cache/GridCachePreloaderAdapter.java | 11 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 2 +-
.../internal/processors/cache/WalStateManager.java | 178 +++--
.../preloader/GridDhtPartitionDemandMessage.java | 29 +-
.../dht/preloader/GridDhtPartitionDemander.java | 777 ++++++++++++---------
.../preloader/GridDhtPartitionsExchangeFuture.java | 32 +-
.../preloader/GridDhtPartitionsFullMessage.java | 2 +-
.../dht/preloader/GridDhtPreloader.java | 62 +-
.../dht/preloader/GridDhtPreloaderAssignments.java | 17 +-
.../dht/topology/GridClientPartitionTopology.java | 4 +-
.../dht/topology/GridDhtPartitionTopologyImpl.java | 71 +-
.../ignite/cache/BreakRebalanceChainTest.java | 176 +++++
.../ignite/cache/NotOptimizedRebalanceTest.java | 288 ++++++++
.../ignite/cache/RebalanceCancellationTest.java | 495 +++++++++++++
.../ignite/cache/affinity/PendingExchangeTest.java | 343 +++++++++
.../IgniteCacheConfigurationTemplateTest.java | 2 +-
.../CacheLateAffinityAssignmentTest.java | 38 +-
...gniteCacheClientNodePartitionsExchangeTest.java | 12 +-
.../dht/topology/EvictPartitionInLogTest.java | 48 +-
...ocalWalModeChangeDuringRebalancingSelfTest.java | 3 -
.../persistence/db/wal/IgniteWalRebalanceTest.java | 171 ++---
.../snapshot/IgniteClusterSnapshotSelfTest.java | 7 +-
.../TxCrossCacheMapOnInvalidTopologyTest.java | 3 +-
.../platform/PlatformCacheAffinityVersionTask.java | 81 +++
.../junits/common/GridCommonAbstractTest.java | 1 -
.../ignite/testsuites/IgniteCacheTestSuite6.java | 2 +
.../ignite/testsuites/IgnitePdsTestSuite4.java | 6 +
.../near/IgniteCacheQueryNodeRestartSelfTest.java | 22 +-
...ngingBaselineCacheQueryNodeRestartSelfTest.java | 5 +-
...ableBaselineCacheQueryNodeRestartsSelfTest.java | 34 +
.../Compute/ComputeApiTest.cs | 12 +-
.../EventsTestLocalListeners.cs | 41 +-
.../Apache.Ignite.Core.Tests/TestUtils.Common.cs | 43 ++
37 files changed, 2461 insertions(+), 694 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 66c3811..ee5b2d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -974,6 +974,15 @@ public final class IgniteSystemProperties {
public static final String IGNITE_DISABLE_WAL_DURING_REBALANCING = "IGNITE_DISABLE_WAL_DURING_REBALANCING";
/**
+ * When property is set {@code false} each next exchange will try to compare with previous.
+ * If last rebalance is equivalent with new possible one, new rebalance does not trigger.
+ * Set the property {@code true} and each exchange will try to trigger new rebalance.
+ *
+ * Default is {@code false}.
+ */
+ public static final String IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION = "IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION";
+
+ /**
* Sets timeout for TCP client recovery descriptor reservation.
*/
public static final String IGNITE_NIO_RECOVERY_DESCRIPTOR_RESERVATION_TIMEOUT =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index f2037e5..1c8db77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -18,12 +18,12 @@
package org.apache.ignite.internal.processors.affinity;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
@@ -997,7 +997,7 @@ public class GridAffinityAssignmentCache {
/**
* @return All initialized versions.
*/
- public Collection<AffinityTopologyVersion> cachedVersions() {
+ public NavigableSet<AffinityTopologyVersion> cachedVersions() {
return affCache.keySet();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index b1ebd2c..d0bdd6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -86,6 +85,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Cac
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandLegacyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -231,14 +231,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private final ConcurrentNavigableMap<AffinityTopologyVersion, AffinityTopologyVersion> lastAffTopVers =
new ConcurrentSkipListMap<>();
- /**
- * Latest started rebalance topology version but possibly not finished yet. Value {@code NONE}
- * means that previous rebalance is undefined and the new one should be initiated.
- *
- * Should not be used to determine latest rebalanced topology.
- */
- private volatile AffinityTopologyVersion rebTopVer = NONE;
-
/** */
private GridFutureAdapter<?> reconnectExchangeFut;
@@ -1020,13 +1012,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @return Latest rebalance topology version or {@code NONE} if there is no info.
- */
- public AffinityTopologyVersion rebalanceTopologyVersion() {
- return rebTopVer;
- }
-
- /**
* @return Last initialized topology future.
*/
public GridDhtPartitionsExchangeFuture lastTopologyFuture() {
@@ -1944,8 +1929,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
- if (grp != null &&
- grp.localStartVersion().compareTo(entry.getValue().topologyVersion()) > 0)
+ if (grp != null && !grp.topology().initialized())
continue;
GridDhtPartitionTopology top = null;
@@ -3341,13 +3325,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (grp.isLocal())
continue;
- if (grp.preloader().rebalanceRequired(rebTopVer, exchFut))
- rebTopVer = NONE;
-
changed |= grp.topology().afterExchange(exchFut);
}
- if (!cctx.kernalContext().clientNode() && changed && !hasPendingServerExchange()) {
+ if (!cctx.kernalContext().clientNode() && changed) {
if (log.isDebugEnabled())
log.debug("Refresh partitions due to mapping was changed");
@@ -3355,11 +3336,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
- // Schedule rebalance if force rebalance or force reassign occurs.
- if (exchFut == null)
- rebTopVer = NONE;
-
- if (!cctx.kernalContext().clientNode() && rebTopVer.equals(NONE)) {
+ if (rebalanceRequired(exchFut)) {
if (rebalanceDelay > 0)
U.sleep(rebalanceDelay);
@@ -3393,7 +3370,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
busy = false;
}
- if (assignsMap != null && rebTopVer.equals(NONE)) {
+ if (!F.isEmpty(assignsMap)) {
int size = assignsMap.size();
NavigableMap<CacheRebalanceOrder, List<Integer>> orderMap = new TreeMap<>();
@@ -3413,11 +3390,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
orderMap.get(order).add(grpId);
}
- Runnable r = null;
+ RebalanceFuture r = null;
- List<String> rebList = new LinkedList<>();
+ GridCompoundFuture<Boolean, Boolean> rebFut = new GridCompoundFuture<>();
- boolean assignsCancelled = false;
+ ArrayList<String> rebList = new ArrayList<>(size);
GridCompoundFuture<Boolean, Boolean> forcedRebFut = null;
@@ -3430,14 +3407,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPreloaderAssignments assigns = assignsMap.get(grpId);
- if (assigns != null)
- assignsCancelled |= assigns.cancelled();
-
- Runnable cur = grp.preloader().addAssignments(assigns,
+ RebalanceFuture cur = grp.preloader().addAssignments(assigns,
forcePreload,
cnt,
r,
- forcedRebFut);
+ forcedRebFut,
+ rebFut);
if (cur != null) {
rebList.add(grp.cacheOrGroupName());
@@ -3447,42 +3422,45 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ rebFut.markInitialized();
+
if (forcedRebFut != null)
forcedRebFut.markInitialized();
- if (assignsCancelled || hasPendingServerExchange()) {
- U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
- "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
- ", node=" + exchId.nodeId() + ']');
- }
- else if (r != null) {
+ if (r != null) {
Collections.reverse(rebList);
- U.log(log, "Rebalancing scheduled [order=" + rebList +
- ", top=" + resVer + ", force=" + (exchFut == null) +
- ", evt=" + exchId.discoveryEventName() +
- ", node=" + exchId.nodeId() + ']');
-
- rebTopVer = resVer;
+ RebalanceFuture finalR = r;
+ // Waits until compatible rebalances are finished.
// Start rebalancing cache groups chain. Each group will be rebalanced
// sequentially one by one e.g.:
// ignite-sys-cache -> cacheGroupR1 -> cacheGroupP2 -> cacheGroupR3
- r.run();
+ rebFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> f) {
+ U.log(log, "Rebalancing scheduled [order=" + rebList +
+ ", top=" + finalR.topologyVersion() +
+ ", evt=" + exchId.discoveryEventName() +
+ ", node=" + exchId.nodeId() + ']');
+
+ finalR.requestPartitions();
+ }
+ });
}
- else
+ else {
U.log(log, "Skipping rebalancing (nothing scheduled) " +
"[top=" + resVer + ", force=" + (exchFut == null) +
", evt=" + exchId.discoveryEventName() +
", node=" + exchId.nodeId() + ']');
+ }
}
- else
+ else {
U.log(log, "Skipping rebalancing (no affinity changes) " +
"[top=" + resVer +
- ", rebTopVer=" + rebTopVer +
", evt=" + exchId.discoveryEventName() +
", evtNode=" + exchId.nodeId() +
", client=" + cctx.kernalContext().clientNode() + ']');
+ }
}
catch (IgniteInterruptedCheckedException e) {
throw e;
@@ -3511,6 +3489,32 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
}
+
+ /**
+ * Rebalance is not required on a client node and is always required when the exchange future is null.
+ * In other cases, this method checks all caches and decides whether rebalancing is required or not
+ * for the specific exchange.
+ *
+ * @param exchFut Exchange future.
+ * @return {@code True} if rebalance is required at least for one of cache groups.
+ */
+ private boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
+ if (cctx.kernalContext().clientNode())
+ return false;
+
+ if (exchFut == null)
+ return true;
+
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal())
+ continue;
+
+ if (grp.preloader().rebalanceRequired(exchFut))
+ return true;
+ }
+
+ return false;
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 1b6aca1..c70f86b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -65,11 +66,10 @@ public interface GridCachePreloader {
public void onInitialExchangeComplete(@Nullable Throwable err);
/**
- * @param rebTopVer Previous rebalance topology version or {@code NONE} if there is no info.
* @param exchFut Completed exchange future.
* @return {@code True} if rebalance should be started (previous will be interrupted).
*/
- public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer, GridDhtPartitionsExchangeFuture exchFut);
+ public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut);
/**
* @param exchId Exchange ID.
@@ -85,15 +85,17 @@ public interface GridCachePreloader {
* @param assignments Assignments to add.
* @param forcePreload {@code True} if preload requested by {@link ForceRebalanceExchangeTask}.
* @param rebalanceId Rebalance id created by exchange thread.
- * @param next Runnable responsible for cache rebalancing chain.
+ * @param next Rebalance's future follows after the current one.
* @param forcedRebFut External future for forced rebalance.
- * @return Rebalancing runnable.
+ * @param compatibleRebFut Future for waiting for compatible rebalances.
+ * @return Future if rebalance was planned or null.
*/
- public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+ public RebalanceFuture addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
long rebalanceId,
- Runnable next,
- @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut);
+ final RebalanceFuture next,
+ @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
+ GridCompoundFuture<Boolean, Boolean> compatibleRebFut);
/**
* @return Future which will complete when preloader is safe to use.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index ff5e321..b382ba4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -138,8 +139,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer,
- GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
return true;
}
@@ -150,11 +150,12 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+ @Override public RebalanceFuture addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
long rebalanceId,
- Runnable next,
- @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut) {
+ RebalanceFuture next,
+ @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
+ GridCompoundFuture<Boolean, Boolean> compatibleRebFut) {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index daf0f26..2bc4fe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1630,7 +1630,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
U.error(log, "Failed to update partition counter. " +
"Most probably a node with most actual data is out of topology or data streamer is used " +
"in preload mode (allowOverride=false) concurrently with cache transactions [grpName=" +
- grp.name() + ", partId=" + partId + ']', e);
+ grp.cacheOrGroupName() + ", partId=" + partId + ']', e);
if (failNodeOnPartitionInconsistency)
ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 3164a1e..82ce336 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
@@ -121,7 +122,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
private boolean disconnected;
/** Holder for groups with temporary disabled WAL. */
- private volatile TemporaryDisabledWal tmpDisabledWal;
+ private final TemporaryDisabledWal tmpDisabledWal = new TemporaryDisabledWal();
/** */
private volatile WALDisableContext walDisableContext;
@@ -406,14 +407,21 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
* in OWNING state if such feature is enabled.
*
* @param topVer Topology version.
- * @param changedBaseline The exchange is caused by Baseline Topology change.
+ * @param exchFut Exchange future.
*/
- public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion topVer, boolean changedBaseline) {
- if (changedBaseline
+ public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion topVer, GridDhtPartitionsExchangeFuture exchFut) {
+ if (exchFut.changedBaseline()
&& IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED)
|| !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, true))
return;
+ ExchangeActions actions = exchFut.exchangeActions();
+
+ if (actions != null && !F.isEmpty(actions.cacheGroupsToStop())) {
+ for (ExchangeActions.CacheGroupActionData grpActionData : actions.cacheGroupsToStop())
+ onGroupRebalanceFinished(grpActionData.descriptor().groupId());
+ }
+
Set<Integer> grpsToEnableWal = new HashSet<>();
Set<Integer> grpsToDisableWal = new HashSet<>();
Set<Integer> grpsWithWalDisabled = new HashSet<>();
@@ -456,17 +464,10 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
if (hasOwning && !grp.localWalEnabled())
grpsToEnableWal.add(grp.groupId());
- else if (hasMoving && !hasOwning && grp.localWalEnabled()) {
+ else if (hasMoving && !hasOwning && grp.localWalEnabled())
grpsToDisableWal.add(grp.groupId());
-
- grpsWithWalDisabled.add(grp.groupId());
- }
- else if (!grp.localWalEnabled())
- grpsWithWalDisabled.add(grp.groupId());
}
- tmpDisabledWal = new TemporaryDisabledWal(grpsWithWalDisabled, topVer);
-
if (grpsToEnableWal.isEmpty() && grpsToDisableWal.isEmpty())
return;
@@ -481,78 +482,64 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
for (Integer grpId : grpsToEnableWal)
cctx.cache().cacheGroup(grpId).localWalEnabled(true, true);
- for (Integer grpId : grpsToDisableWal)
- cctx.cache().cacheGroup(grpId).localWalEnabled(false, true);
+ tmpDisabledWal.disable(grpsToDisableWal);
}
/**
* Callback when group rebalancing is finished. If there are no pending groups, it should trigger checkpoint and
* change partition states.
* @param grpId Group ID.
- * @param topVer Topology version.
*/
- public void onGroupRebalanceFinished(int grpId, AffinityTopologyVersion topVer) {
- TemporaryDisabledWal session0 = tmpDisabledWal;
+ public void onGroupRebalanceFinished(int grpId) {
+ Set<Integer> groupsToEnable = tmpDisabledWal.enable(grpId);
- if (session0 == null || session0.topVer.compareTo(topVer) > 0)
+ if (F.isEmpty(groupsToEnable))
return;
- session0.remainingGrps.remove(grpId);
-
- if (session0.remainingGrps.isEmpty()) {
- synchronized (mux) {
- if (tmpDisabledWal != session0)
- return;
-
- for (Integer grpId0 : session0.disabledGrps) {
- CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
-
- assert grp != null;
-
- if (!grp.localWalEnabled())
- grp.localWalEnabled(true, false);
- }
+ grpId = F.first(groupsToEnable);
- tmpDisabledWal = null;
- }
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
- // Pending updates in groups with disabled WAL are not protected from crash.
- // Need to trigger checkpoint for attempt to persist them.
- CheckpointProgress cpFut = triggerCheckpoint("wal-local-state-changed-rebalance-finished-" + topVer);
+ assert grp != null : "Can not find group with id: " + grpId;
- assert cpFut != null;
+ AffinityTopologyVersion lastGroupTop = grp.topology().readyTopologyVersion();
- // It's safe to switch partitions to owning state only if checkpoint was successfully finished.
- cpFut.futureFor(FINISHED).listen(new IgniteInClosureX<IgniteInternalFuture>() {
- @Override public void applyx(IgniteInternalFuture future) {
- if (X.hasCause(future.error(), NodeStoppingException.class))
- return;
+ // Pending updates in groups with disabled WAL are not protected from crash.
+ // Need to trigger checkpoint for attempt to persist them.
+ CheckpointProgress cpFut = triggerCheckpoint("wal-local-state-changed-rebalance-finished-" + lastGroupTop);
- for (Integer grpId0 : session0.disabledGrps) {
- try {
- cctx.database().walEnabled(grpId0, true, true);
- }
- catch (Exception e) {
- if (!X.hasCause(e, NodeStoppingException.class))
- throw e;
- }
+ assert cpFut != null;
- CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
+ // It's safe to switch partitions to owning state only if checkpoint was successfully finished.
+ cpFut.futureFor(FINISHED).listen(new IgniteInClosureX<IgniteInternalFuture>() {
+ @Override public void applyx(IgniteInternalFuture future) {
+ if (X.hasCause(future.error(), NodeStoppingException.class))
+ return;
- if (grp != null)
- grp.topology().ownMoving(topVer);
- else if (log.isDebugEnabled())
- log.debug("Cache group was destroyed before checkpoint finished, [grpId=" + grpId0 + ']');
+ for (Integer grpId0 : groupsToEnable) {
+ try {
+ cctx.database().walEnabled(grpId0, true, true);
+ }
+ catch (Exception e) {
+ if (!X.hasCause(e, NodeStoppingException.class))
+ throw e;
}
- if (log.isDebugEnabled())
- log.debug("Refresh partitions due to rebalance finished");
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
- // Trigger exchange for switching to ideal assignment when all nodes are ready.
- cctx.exchange().refreshPartitions();
+ if (grp != null)
+ grp.topology().ownMoving(lastGroupTop);
+ else if (log.isDebugEnabled())
+ log.debug("Cache group was destroyed before checkpoint finished, [grpId=" + grpId0 + ']');
}
- });
- }
+
+ if (log.isDebugEnabled())
+ log.debug("Refresh partitions due to rebalance finished");
+
+ // Trigger exchange for switching to ideal assignment when all nodes are ready.
+ cctx.exchange().refreshPartitions();
+ }
+ });
}
/**
@@ -1183,29 +1170,66 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
/**
*
*/
- private static class TemporaryDisabledWal {
+ private class TemporaryDisabledWal {
/** Groups with disabled WAL. */
- private final Set<Integer> disabledGrps;
+ private final Set<Integer> disabledGrps = new HashSet<>();
/** Remaining groups. */
- private final Set<Integer> remainingGrps;
+ private final Set<Integer> remainingGrps = new HashSet<>();
- /** Topology version*/
- private final AffinityTopologyVersion topVer;
+ /**
+ * Disables WAL of groups specified.
+ *
+ * @param disabledGrps Groups' list whose WAL should disable.
+ */
+ public synchronized void disable(Set<Integer> disabledGrps) {
+ this.disabledGrps.addAll(disabledGrps);
+ this.remainingGrps.addAll(disabledGrps);
- /** */
- public TemporaryDisabledWal(
- Set<Integer> disabledGrps,
- AffinityTopologyVersion topVer
- ) {
- this.disabledGrps = Collections.unmodifiableSet(disabledGrps);
- this.remainingGrps = new HashSet<>(disabledGrps);
- this.topVer = topVer;
+ for (Integer grpId : disabledGrps)
+ cctx.cache().cacheGroup(grpId).localWalEnabled(false, true);
+ }
+
+ /**
+ * Memorized group which for, WAL will be enabled.
+ * If WAL for all temporary disabled groups would be enabled,
+ * WAL will be local enable and result will not be empty.
+ *
+ * @param grpId Group id.
+ * @return List of groups which were local enabled.
+ */
+ public synchronized Set<Integer> enable(int grpId) {
+ remainingGrps.remove(grpId);
+
+ if (remainingGrps.isEmpty()) {
+ HashSet<Integer> walEnablingGrps = new HashSet<>(disabledGrps.size());
+
+ for (Integer grpId0 : disabledGrps) {
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
+
+ if (grp == null) {
+ log.warning("Group stopped. Chnage WAL state does not need [id=" + grpId0 + "]");
+
+ continue;
+ }
+
+ if (!grp.localWalEnabled())
+ grp.localWalEnabled(true, false);
+
+ walEnablingGrps.add(grpId0);
+ }
+
+ disabledGrps.clear();
+
+ return walEnablingGrps;
+ }
+
+ return Collections.EMPTY_SET;
}
}
/**
- *
+ * Temporary storage for disabled WALs of group.
*/
public static class WALDisableContext implements MetastorageLifecycleListener {
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 918cfc9..db5cc5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -44,6 +45,9 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
/** */
public static final IgniteProductVersion VERSION_SINCE = IgniteProductVersion.fromString("2.4.4");
+ /** Cache rebalance topic. */
+ private static final Object REBALANCE_TOPIC = GridCachePartitionExchangeManager.rebalanceTopic(0);
+
/** Rebalance id. */
private long rebalanceId;
@@ -56,7 +60,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
/** Topic. */
@GridDirectTransient
- private Object topic;
+ private Object topic = REBALANCE_TOPIC;
/** Serialized topic. */
private byte[] topicBytes;
@@ -76,11 +80,21 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
* @param grpId Cache group ID.
*/
GridDhtPartitionDemandMessage(long rebalanceId, @NotNull AffinityTopologyVersion topVer, int grpId) {
+ this(rebalanceId, topVer, grpId, new IgniteDhtDemandedPartitionsMap());
+ }
+
+ /**
+ * @param rebalanceId Rebalance id for this node.
+ * @param topVer Topology version.
+ * @param grpId Cache group ID.
+ * @param parts Demand partiton map.
+ */
+ GridDhtPartitionDemandMessage(long rebalanceId, @NotNull AffinityTopologyVersion topVer, int grpId,
+ IgniteDhtDemandedPartitionsMap parts) {
this.grpId = grpId;
this.rebalanceId = rebalanceId;
this.topVer = topVer;
-
- parts = new IgniteDhtDemandedPartitionsMap();
+ this.parts = parts;
}
/**
@@ -178,15 +192,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
}
/**
- * @param topic Topic.
- * @deprecated Obsolete (Kept to solve compatibility issues).
- */
- @Deprecated
- void topic(Object topic) {
- this.topic = topic;
- }
-
- /**
* @return Worker ID.
*/
int workerId() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index ca768fc..2c64e6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -24,13 +24,16 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -117,9 +120,6 @@ public class GridDhtPartitionDemander {
/** Last exchange future. */
private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
- /** Cache rebalance topic. */
- private final Object rebalanceTopic;
-
/** Rebalancing last cancelled time. */
private final AtomicLong lastCancelledTime = new AtomicLong(-1);
@@ -147,8 +147,6 @@ public class GridDhtPartitionDemander {
Map<Integer, Object> tops = new HashMap<>();
- rebalanceTopic = GridCachePartitionExchangeManager.rebalanceTopic(0);
-
String metricGroupName = metricName(CACHE_GROUP_METRICS_PREFIX, grp.cacheOrGroupName());
MetricRegistry mreg = grp.shared().kernalContext().metric().registry(metricGroupName);
@@ -256,16 +254,6 @@ public class GridDhtPartitionDemander {
}
/**
- * @param fut Future.
- * @return {@code True} if rebalance topology version changed by exchange thread or force
- * reassing exchange occurs, see {@link RebalanceReassignExchangeTask} for details.
- */
- private boolean topologyChanged(RebalanceFuture fut) {
- return !ctx.exchange().rebalanceTopologyVersion().equals(fut.topVer) ||
- fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
- }
-
- /**
* Sets last exchange future.
*
* @param lastFut Last future to set.
@@ -289,16 +277,19 @@ public class GridDhtPartitionDemander {
* @param assignments Assignments to process.
* @param force {@code True} if preload request by {@link ForceRebalanceExchangeTask}.
* @param rebalanceId Rebalance id generated from exchange thread.
- * @param next Runnable responsible for cache rebalancing chain.
+ * @param next A next rebalance routine in chain.
* @param forcedRebFut External future for forced rebalance.
- * @return Rebalancing runnable.
+ * @param compatibleRebFut Future for waiting for compatible rebalances.
+ *
+ * @return Rebalancing future or {@code null} to exclude an assignment from a chain.
*/
- Runnable addAssignments(
+ @Nullable RebalanceFuture addAssignments(
final GridDhtPreloaderAssignments assignments,
boolean force,
long rebalanceId,
- final Runnable next,
- @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut
+ final RebalanceFuture next,
+ @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut,
+ GridCompoundFuture<Boolean, Boolean> compatibleRebFut
) {
if (log.isDebugEnabled())
log.debug("Adding partition assignments: " + assignments);
@@ -310,21 +301,62 @@ public class GridDhtPartitionDemander {
if ((delay == 0 || force) && assignments != null) {
final RebalanceFuture oldFut = rebalanceFut;
- final RebalanceFuture fut = new RebalanceFuture(grp, assignments, log, rebalanceId, lastCancelledTime);
+ if (assignments.cancelled()) { // Pending exchange.
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing skipped due to cancelled assignments.");
+
+ return null;
+ }
+
+ if (assignments.isEmpty()) { // Nothing to rebalance.
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing skipped due to empty assignments.");
+
+ if (oldFut.isInitial())
+ oldFut.onDone(true);
+
+ ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
+
+ ctx.exchange().scheduleResendPartitions();
+
+ return null;
+ }
+
+ if (!force && (!oldFut.isDone() || oldFut.result()) && oldFut.compatibleWith(assignments)) {
+ if (!oldFut.isDone())
+ compatibleRebFut.add(oldFut);
- if (!grp.localWalEnabled())
+ return null;
+ }
+
+ final RebalanceFuture fut = new RebalanceFuture(grp, assignments, log, rebalanceId, next, lastCancelledTime);
+
+ if (!grp.localWalEnabled()) {
fut.listen(new IgniteInClosureX<IgniteInternalFuture<Boolean>>() {
@Override public void applyx(IgniteInternalFuture<Boolean> future) throws IgniteCheckedException {
if (future.get())
- ctx.walState().onGroupRebalanceFinished(grp.groupId(), assignments.topologyVersion());
+ ctx.walState().onGroupRebalanceFinished(grp.groupId());
}
});
+ }
if (!oldFut.isInitial())
- oldFut.cancel();
+ oldFut.tryCancel();
else
fut.listen(f -> oldFut.onDone(f.result()));
+ // Make sure partitions scheduled for full rebalancing are first cleared.
+ if (grp.persistenceEnabled()) {
+ for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) {
+ for (Integer partId : e.getValue().partitions().fullSet()) {
+ GridDhtLocalPartition part = grp.topology().localPartition(partId);
+
+ if (part != null && part.state() == MOVING)
+ part.clearAsync();
+ }
+ }
+ }
+
if (forcedRebFut != null)
forcedRebFut.add(fut);
@@ -356,45 +388,7 @@ public class GridDhtPartitionDemander {
fut.sendRebalanceStartedEvent();
- if (assignments.cancelled()) { // Pending exchange.
- if (log.isDebugEnabled())
- log.debug("Rebalancing skipped due to cancelled assignments.");
-
- fut.onDone(false);
-
- fut.sendRebalanceFinishedEvent();
-
- return null;
- }
-
- if (assignments.isEmpty()) { // Nothing to rebalance.
- if (log.isDebugEnabled())
- log.debug("Rebalancing skipped due to empty assignments.");
-
- fut.onDone(true);
-
- ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
-
- fut.sendRebalanceFinishedEvent();
-
- return null;
- }
-
- return () -> {
- if (next != null)
- fut.listen(f -> {
- try {
- if (f.get()) // Not cancelled.
- next.run(); // Starts next cache rebalancing (according to the order).
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug(e.getMessage());
- }
- });
-
- requestPartitions(fut, assignments);
- };
+ return fut;
}
else if (delay > 0) {
for (GridCacheContext cctx : grp.caches()) {
@@ -433,212 +427,12 @@ public class GridDhtPartitionDemander {
}
/**
- * Asynchronously sends initial demand messages formed from {@code assignments} and initiates supply-demand processes.
- *
- * For each node participating in rebalance process method distributes set of partitions for that node to several stripes (topics).
- * It means that each stripe containing a subset of partitions can be processed in parallel.
- * The number of stripes are controlled by {@link IgniteConfiguration#getRebalanceThreadPoolSize()} property.
- *
- * Partitions that can be rebalanced using only WAL are called historical, others are called full.
- *
- * Before sending messages, method awaits partitions clearing for full partitions.
- *
- * @param fut Rebalance future.
- * @param assignments Assignments.
- */
- private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssignments assignments) {
- assert fut != null;
-
- if (topologyChanged(fut)) {
- fut.cancel();
-
- return;
- }
-
- if (!ctx.kernalContext().grid().isRebalanceEnabled()) {
- if (log.isTraceEnabled())
- log.trace("Cancel partition demand because rebalance disabled on current node.");
-
- fut.cancel();
-
- return;
- }
-
- final CacheConfiguration cfg = grp.config();
-
- for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) {
- final ClusterNode node = e.getKey();
-
- GridDhtPartitionDemandMessage d = e.getValue();
-
- final IgniteDhtDemandedPartitionsMap parts;
-
- synchronized (fut) { // Synchronized to prevent consistency issues in case of parallel cancellation.
- if (fut.isDone())
- break;
-
- if (rebalanceFut.startTime == -1)
- rebalanceFut.startTime = System.currentTimeMillis();
-
- parts = fut.remaining.get(node.id());
-
- U.log(log, "Prepared rebalancing [grp=" + grp.cacheOrGroupName()
- + ", mode=" + cfg.getRebalanceMode() + ", supplier=" + node.id() + ", partitionsCount=" + parts.size()
- + ", topVer=" + fut.topologyVersion() + "]");
- }
-
- if (!parts.isEmpty()) {
- d.topic(rebalanceTopic);
- d.rebalanceId(fut.rebalanceId);
- d.timeout(grp.preloader().timeout());
-
- IgniteInternalFuture<?> clearAllFuture = clearFullPartitions(fut, d.partitions().fullSet());
-
- // Start rebalancing after clearing full partitions is finished.
- clearAllFuture.listen(f -> ctx.kernalContext().closure().runLocalSafe(() -> {
- if (fut.isDone())
- return;
-
- try {
- if (log.isInfoEnabled())
- log.info("Starting rebalance routine [" + grp.cacheOrGroupName() +
- ", topVer=" + fut.topologyVersion() +
- ", supplier=" + node.id() +
- ", fullPartitions=" + S.compact(parts.fullSet()) +
- ", histPartitions=" + S.compact(parts.historicalSet()) + "]");
-
- ctx.io().sendOrderedMessage(node, rebalanceTopic,
- d.convertIfNeeded(node.version()), grp.ioPolicy(), d.timeout());
-
- // Cleanup required in case partitions demanded in parallel with cancellation.
- synchronized (fut) {
- if (fut.isDone())
- fut.cleanupRemoteContexts(node.id());
- }
- }
- catch (IgniteCheckedException e1) {
- ClusterTopologyCheckedException cause = e1.getCause(ClusterTopologyCheckedException.class);
-
- if (cause != null)
- log.warning("Failed to send initial demand request to node. " + e1.getMessage());
- else
- log.error("Failed to send initial demand request to node.", e1);
-
- fut.cancel();
- }
- catch (Throwable th) {
- log.error("Runtime error caught during initial demand request sending.", th);
-
- fut.cancel();
- }
- }, true));
- }
- }
- }
-
- /**
- * Creates future which will be completed when all {@code fullPartitions} are cleared.
- *
- * @param fut Rebalance future.
- * @param fullPartitions Set of full partitions need to be cleared.
- * @return Future which will be completed when given partitions are cleared.
- */
- private IgniteInternalFuture<?> clearFullPartitions(RebalanceFuture fut, Set<Integer> fullPartitions) {
- final GridFutureAdapter clearAllFuture = new GridFutureAdapter();
-
- if (fullPartitions.isEmpty()) {
- clearAllFuture.onDone();
-
- return clearAllFuture;
- }
-
- for (GridCacheContext cctx : grp.caches()) {
- if (cctx.statisticsEnabled()) {
- final CacheMetricsImpl metrics = cctx.cache().metrics0();
-
- metrics.rebalanceClearingPartitions(fullPartitions.size());
- }
- }
-
- final AtomicInteger clearingPartitions = new AtomicInteger(fullPartitions.size());
-
- for (int partId : fullPartitions) {
- if (fut.isDone()) {
- clearAllFuture.onDone();
-
- return clearAllFuture;
- }
-
- GridDhtLocalPartition part = grp.topology().localPartition(partId);
-
- if (part != null && part.state() == MOVING) {
- part.onClearFinished(f -> {
- if (!fut.isDone()) {
- // Cancel rebalance if partition clearing was failed.
- if (f.error() != null) {
- for (GridCacheContext cctx : grp.caches()) {
- if (cctx.statisticsEnabled()) {
- final CacheMetricsImpl metrics = cctx.cache().metrics0();
-
- metrics.rebalanceClearingPartitions(0);
- }
- }
-
- log.error("Unable to await partition clearing " + part, f.error());
-
- fut.cancel();
-
- clearAllFuture.onDone(f.error());
- }
- else {
- int remaining = clearingPartitions.decrementAndGet();
-
- for (GridCacheContext cctx : grp.caches()) {
- if (cctx.statisticsEnabled()) {
- final CacheMetricsImpl metrics = cctx.cache().metrics0();
-
- metrics.rebalanceClearingPartitions(remaining);
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("Partition is ready for rebalance [grp=" + grp.cacheOrGroupName()
- + ", p=" + part.id() + ", remaining=" + remaining + "]");
-
- if (remaining == 0)
- clearAllFuture.onDone();
- }
- }
- else
- clearAllFuture.onDone();
- });
- }
- else {
- int remaining = clearingPartitions.decrementAndGet();
-
- for (GridCacheContext cctx : grp.caches()) {
- if (cctx.statisticsEnabled()) {
- final CacheMetricsImpl metrics = cctx.cache().metrics0();
-
- metrics.rebalanceClearingPartitions(remaining);
- }
- }
-
- if (remaining == 0)
- clearAllFuture.onDone();
- }
- }
-
- return clearAllFuture;
- }
-
- /**
* Enqueues supply message.
*/
public void registerSupplyMessage(final UUID nodeId, final GridDhtPartitionSupplyMessage supplyMsg, final Runnable r) {
final RebalanceFuture fut = rebalanceFut;
- if (!topologyChanged(fut) && fut.isActual(supplyMsg.rebalanceId())) {
+ if (fut.isActual(supplyMsg.rebalanceId())) {
boolean historical = false;
for (Integer p : supplyMsg.infos().keySet()) {
@@ -679,6 +473,13 @@ public class GridDhtPartitionDemander {
fut.cancelLock.readLock().lock();
try {
+ if (fut.isDone()) {
+ if (log.isDebugEnabled())
+ log.debug("Supply message ignored (rebalance completed) [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
+
+ return;
+ }
+
ClusterNode node = ctx.node(nodeId);
if (node == null) {
@@ -689,7 +490,7 @@ public class GridDhtPartitionDemander {
}
// Topology already changed (for the future that supply message based on).
- if (topologyChanged(fut) || !fut.isActual(supplyMsg.rebalanceId())) {
+ if (!fut.isActual(supplyMsg.rebalanceId())) {
if (log.isDebugEnabled())
log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
@@ -845,12 +646,10 @@ public class GridDhtPartitionDemander {
d.timeout(grp.preloader().timeout());
- d.topic(rebalanceTopic);
-
- if (!topologyChanged(fut) && !fut.isDone()) {
+ if (!fut.isDone()) {
// Send demand message.
try {
- ctx.io().sendOrderedMessage(node, rebalanceTopic,
+ ctx.io().sendOrderedMessage(node, d.topic(),
d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.preloader().timeout());
if (log.isDebugEnabled())
@@ -865,7 +664,7 @@ public class GridDhtPartitionDemander {
else {
if (log.isDebugEnabled())
log.debug("Will not request next demand message [" + demandRoutineInfo(nodeId, supplyMsg) +
- ", topChanged=" + topologyChanged(fut) + ", rebalanceFuture=" + fut + "]");
+ ", rebalanceFuture=" + fut + "]");
}
}
catch (IgniteSpiException | IgniteCheckedException e) {
@@ -888,7 +687,7 @@ public class GridDhtPartitionDemander {
int p,
final UUID nodeId,
final GridDhtPartitionSupplyMessage supplyMsg) {
- if (topologyChanged(fut) || !fut.isActual(supplyMsg.rebalanceId()))
+ if (fut.isDone() || !fut.isActual(supplyMsg.rebalanceId()))
return;
long queued = fut.queued.get(p).sum();
@@ -1185,12 +984,33 @@ public class GridDhtPartitionDemander {
}
/**
+ * Internal states of rebalance future.
+ */
+ private enum RebalanceFutureState {
+ /** Init. */
+ INIT,
+
+ /** Started. */
+ STARTED,
+
+ /** Marked as cancelled. */
+ MARK_CANCELLED,
+ }
+
+ /**
*
*/
public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
+ /** State updater. */
+ private static final AtomicReferenceFieldUpdater<RebalanceFuture, RebalanceFutureState> STATE_UPD =
+ AtomicReferenceFieldUpdater.newUpdater(RebalanceFuture.class, RebalanceFutureState.class, "state");
+
/** */
private final GridCacheSharedContext<?, ?> ctx;
+ /** Internal state. */
+ private volatile RebalanceFutureState state = RebalanceFutureState.INIT;
+
/** */
private final CacheGroupContext grp;
@@ -1248,23 +1068,37 @@ public class GridDhtPartitionDemander {
/** Rebalancing last cancelled time. */
private final AtomicLong lastCancelledTime;
+ /** Next future in chain. */
+ private final RebalanceFuture next;
+
+ /** Assigment. */
+ private final GridDhtPreloaderAssignments assignments;
+
+ /** Partitions which have been scheduled for rebalance from specific supplier. */
+ private final Map<ClusterNode, Set<Integer>> rebalancingParts;
+
/**
* @param grp Cache group.
* @param assignments Assignments.
* @param log Logger.
- *
- @param rebalanceId Rebalance id.
+ * @param rebalanceId Rebalance id.
+ * @param next Next rebalance future.
+ * @param lastCancelledTime Cancelled time.
*/
RebalanceFuture(
CacheGroupContext grp,
GridDhtPreloaderAssignments assignments,
IgniteLogger log,
long rebalanceId,
+ RebalanceFuture next,
AtomicLong lastCancelledTime) {
assert assignments != null;
+ this.rebalancingParts = U.newHashMap(assignments.size());
+ this.assignments = assignments;
exchId = assignments.exchangeId();
topVer = assignments.topologyVersion();
+ this.next = next;
this.lastCancelledTime = lastCancelledTime;
@@ -1276,6 +1110,11 @@ public class GridDhtPartitionDemander {
partitionsLeft.addAndGet(v.partitions().size());
+ rebalancingParts.put(k, new HashSet<Integer>(v.partitions().size()) {{
+ addAll(v.partitions().historicalSet());
+ addAll(v.partitions().fullSet());
+ }});
+
historical.addAll(v.partitions().historicalSet());
Stream.concat(v.partitions().historicalSet().stream(), v.partitions().fullSet().stream())
@@ -1301,6 +1140,8 @@ public class GridDhtPartitionDemander {
* Dummy future. Will be done by real one.
*/
RebalanceFuture() {
+ this.rebalancingParts = null;
+ this.assignments = null;
this.exchId = null;
this.topVer = null;
this.ctx = null;
@@ -1309,10 +1150,224 @@ public class GridDhtPartitionDemander {
this.rebalanceId = -1;
this.routines = 0;
this.cancelLock = new ReentrantReadWriteLock();
+ this.next = null;
this.lastCancelledTime = new AtomicLong();
}
/**
+ * Asynchronously sends initial demand messages formed from {@code assignments} and initiates supply-demand processes.
+ *
+ * For each node participating in rebalance process method distributes set of partitions for that node to several stripes (topics).
+ * It means that each stripe containing a subset of partitions can be processed in parallel.
+ * The number of stripes are controlled by {@link IgniteConfiguration#getRebalanceThreadPoolSize()} property.
+ *
+ * Partitions that can be rebalanced using only WAL are called historical, others are called full.
+ *
+ * Before sending messages, method awaits partitions clearing for full partitions.
+ */
+ public void requestPartitions() {
+ if (!STATE_UPD.compareAndSet(this, RebalanceFutureState.INIT, RebalanceFutureState.STARTED)) {
+ cancel();
+
+ return;
+ }
+
+ if (!ctx.kernalContext().grid().isRebalanceEnabled()) {
+ if (log.isTraceEnabled())
+ log.trace("Cancel partition demand because rebalance disabled on current node.");
+
+ cancel();
+
+ return;
+ }
+
+ if (isDone()) {
+ assert !result() : "Rebalance future was done, but partitions never requested [grp="
+ + grp.cacheOrGroupName() + ", topVer=" + topologyVersion() + "]";
+
+ return;
+ }
+
+ final CacheConfiguration cfg = grp.config();
+
+ for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) {
+ final ClusterNode node = e.getKey();
+
+ GridDhtPartitionDemandMessage d = e.getValue();
+
+ final IgniteDhtDemandedPartitionsMap parts;
+
+ synchronized (this) { // Synchronized to prevent consistency issues in case of parallel cancellation.
+ if (isDone())
+ return;
+
+ if (startTime == -1)
+ startTime = System.currentTimeMillis();
+
+ parts = remaining.get(node.id());
+
+ U.log(log, "Prepared rebalancing [grp=" + grp.cacheOrGroupName()
+ + ", mode=" + cfg.getRebalanceMode() + ", supplier=" + node.id() + ", partitionsCount=" + parts.size()
+ + ", topVer=" + topologyVersion() + "]");
+ }
+
+ if (!parts.isEmpty()) {
+ d.rebalanceId(rebalanceId);
+ d.timeout(grp.preloader().timeout());
+
+ IgniteInternalFuture<?> clearAllFuture = clearFullPartitions(this, d.partitions().fullSet());
+
+ // Start rebalancing after clearing full partitions is finished.
+ clearAllFuture.listen(f -> ctx.kernalContext().closure().runLocalSafe(() -> {
+ if (isDone())
+ return;
+
+ try {
+ if (log.isInfoEnabled())
+ log.info("Starting rebalance routine [" + grp.cacheOrGroupName() +
+ ", topVer=" + topologyVersion() +
+ ", supplier=" + node.id() +
+ ", fullPartitions=" + S.compact(parts.fullSet()) +
+ ", histPartitions=" + S.compact(parts.historicalSet()) + "]");
+
+ ctx.io().sendOrderedMessage(node, d.topic(),
+ d.convertIfNeeded(node.version()), grp.ioPolicy(), d.timeout());
+
+ // Cleanup required in case partitions demanded in parallel with cancellation.
+ synchronized (this) {
+ if (isDone())
+ cleanupRemoteContexts(node.id());
+ }
+ }
+ catch (IgniteCheckedException e1) {
+ ClusterTopologyCheckedException cause = e1.getCause(ClusterTopologyCheckedException.class);
+
+ if (cause != null)
+ log.warning("Failed to send initial demand request to node. " + e1.getMessage());
+ else
+ log.error("Failed to send initial demand request to node.", e1);
+
+ cancel();
+ }
+ catch (Throwable th) {
+ log.error("Runtime error caught during initial demand request sending.", th);
+
+ cancel();
+ }
+ }, true));
+ }
+ }
+ }
+
+ /**
+ * Creates future which will be completed when all {@code fullPartitions} are cleared.
+ *
+ * @param fut Rebalance future.
+ * @param fullPartitions Set of full partitions need to be cleared.
+ * @return Future which will be completed when given partitions are cleared.
+ */
+ private IgniteInternalFuture<?> clearFullPartitions(RebalanceFuture fut, Set<Integer> fullPartitions) {
+ final GridFutureAdapter clearAllFuture = new GridFutureAdapter();
+
+ if (fullPartitions.isEmpty()) {
+ clearAllFuture.onDone();
+
+ return clearAllFuture;
+ }
+
+ for (GridCacheContext cctx : grp.caches()) {
+ if (cctx.statisticsEnabled()) {
+ final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+ metrics.rebalanceClearingPartitions(fullPartitions.size());
+ }
+ }
+
+ final AtomicInteger clearingPartitions = new AtomicInteger(fullPartitions.size());
+
+ for (int partId : fullPartitions) {
+ if (fut.isDone()) {
+ clearAllFuture.onDone();
+
+ return clearAllFuture;
+ }
+
+ GridDhtLocalPartition part = grp.topology().localPartition(partId);
+
+ if (part != null && part.state() == MOVING) {
+ part.onClearFinished(f -> {
+ if (!fut.isDone()) {
+ // Cancel rebalance if partition clearing was failed.
+ if (f.error() != null) {
+ for (GridCacheContext cctx : grp.caches()) {
+ if (cctx.statisticsEnabled()) {
+ final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+ metrics.rebalanceClearingPartitions(0);
+ }
+ }
+
+ log.error("Unable to await partition clearing " + part, f.error());
+
+ fut.cancel();
+
+ clearAllFuture.onDone(f.error());
+ }
+ else {
+ int remaining = clearingPartitions.decrementAndGet();
+
+ for (GridCacheContext cctx : grp.caches()) {
+ if (cctx.statisticsEnabled()) {
+ final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+ metrics.rebalanceClearingPartitions(remaining);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Partition is ready for rebalance [grp=" + grp.cacheOrGroupName()
+ + ", p=" + part.id() + ", remaining=" + remaining + "]");
+
+ if (remaining == 0)
+ clearAllFuture.onDone();
+ }
+ }
+ else
+ clearAllFuture.onDone();
+ });
+ }
+ else {
+ int remaining = clearingPartitions.decrementAndGet();
+
+ for (GridCacheContext cctx : grp.caches()) {
+ if (cctx.statisticsEnabled()) {
+ final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+ metrics.rebalanceClearingPartitions(remaining);
+ }
+ }
+
+ if (remaining == 0)
+ clearAllFuture.onDone();
+ }
+ }
+
+ return clearAllFuture;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ if (next != null)
+ next.requestPartitions(); // Go to next item in chain everything if it exists.
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @return Topology version.
*/
public AffinityTopologyVersion topologyVersion() {
@@ -1335,6 +1390,16 @@ public class GridDhtPartitionDemander {
}
/**
+ * Cancel future or mark for cancel {@code RebalanceFutureState#MARK_CANCELLED}.
+ */
+ private void tryCancel() {
+ if (STATE_UPD.compareAndSet(this, RebalanceFutureState.INIT, RebalanceFutureState.MARK_CANCELLED))
+ return;
+
+ cancel();
+ }
+
+ /**
* Cancels this future.
*
* @return {@code True}.
@@ -1389,37 +1454,33 @@ public class GridDhtPartitionDemander {
/**
* @param nodeId Node id.
*/
- private void cancel(UUID nodeId) {
- synchronized (this) {
- if (isDone())
- return;
+ private synchronized void cancel(UUID nodeId) {
+ if (isDone())
+ return;
- U.log(log, ("Cancelled rebalancing [grp=" + grp.cacheOrGroupName() +
- ", supplier=" + nodeId + ", topVer=" + topologyVersion() + ']'));
+ U.log(log, ("Cancelled rebalancing [grp=" + grp.cacheOrGroupName() +
+ ", supplier=" + nodeId + ", topVer=" + topologyVersion() + ']'));
- cleanupRemoteContexts(nodeId);
+ cleanupRemoteContexts(nodeId);
- remaining.remove(nodeId);
+ remaining.remove(nodeId);
- onDone(false); // Finishing rebalance future as non completed.
+ onDone(false); // Finishing rebalance future as non completed.
- checkIsDone(); // But will finish syncFuture only when other nodes are preloaded or rebalancing cancelled.
- }
+ checkIsDone(); // But will finish syncFuture only when other nodes are preloaded or rebalancing cancelled.
}
/**
* @param nodeId Node id.
* @param p Partition id.
*/
- private void partitionMissed(UUID nodeId, int p) {
- synchronized (this) {
- if (isDone())
- return;
+ private synchronized void partitionMissed(UUID nodeId, int p) {
+ if (isDone())
+ return;
- missed.computeIfAbsent(nodeId, k -> new HashSet<>());
+ missed.computeIfAbsent(nodeId, k -> new HashSet<>());
- missed.get(nodeId).add(p);
- }
+ missed.get(nodeId).add(p);
}
/**
@@ -1443,8 +1504,6 @@ public class GridDhtPartitionDemander {
try {
Object rebalanceTopic = GridCachePartitionExchangeManager.rebalanceTopic(0);
- d.topic(rebalanceTopic);
-
ctx.io().sendOrderedMessage(node, rebalanceTopic,
d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.preloader().timeout());
}
@@ -1458,44 +1517,42 @@ public class GridDhtPartitionDemander {
* @param nodeId Node id.
* @param p Partition number.
*/
- private void partitionDone(UUID nodeId, int p, boolean updateState) {
- synchronized (this) {
- if (updateState && grp.localWalEnabled())
- grp.topology().own(grp.topology().localPartition(p));
+ private synchronized void partitionDone(UUID nodeId, int p, boolean updateState) {
+ if (updateState && grp.localWalEnabled())
+ grp.topology().own(grp.topology().localPartition(p));
- if (isDone())
- return;
+ if (isDone())
+ return;
- if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchId.discoveryEvent());
+ if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+ rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchId.discoveryEvent());
- IgniteDhtDemandedPartitionsMap parts = remaining.get(nodeId);
+ IgniteDhtDemandedPartitionsMap parts = remaining.get(nodeId);
- assert parts != null : "Remaining not found [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
- ", part=" + p + "]";
+ assert parts != null : "Remaining not found [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
+ ", part=" + p + "]";
- boolean rmvd = parts.remove(p);
+ boolean rmvd = parts.remove(p);
- assert rmvd : "Partition already done [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
- ", part=" + p + ", left=" + parts + "]";
+ assert rmvd : "Partition already done [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
+ ", part=" + p + ", left=" + parts + "]";
if (rmvd)
partitionsLeft.decrementAndGet();
- if (parts.isEmpty()) {
- int remainingRoutines = remaining.size() - 1;
+ if (parts.isEmpty()) {
+ int remainingRoutines = remaining.size() - 1;
- U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) " : "") +
- "rebalancing [grp=" + grp.cacheOrGroupName() +
- ", supplier=" + nodeId +
- ", topVer=" + topologyVersion() +
- ", progress=" + (routines - remainingRoutines) + "/" + routines + "]"));
+ U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) " : "") +
+ "rebalancing [grp=" + grp.cacheOrGroupName() +
+ ", supplier=" + nodeId +
+ ", topVer=" + topologyVersion() +
+ ", progress=" + (routines - remainingRoutines) + "/" + routines + "]"));
- remaining.remove(nodeId);
- }
-
- checkIsDone();
+ remaining.remove(nodeId);
}
+
+ checkIsDone();
}
/**
@@ -1538,7 +1595,8 @@ public class GridDhtPartitionDemander {
log.debug("Partitions have been scheduled to resend [reason=" +
"Rebalance is done [grp=" + grp.cacheOrGroupName() + "]");
- ctx.exchange().scheduleResendPartitions();
+ if (!cancelled)
+ ctx.exchange().scheduleResendPartitions();
Collection<Integer> m = new HashSet<>();
@@ -1587,6 +1645,87 @@ public class GridDhtPartitionDemander {
rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchId.discoveryEvent());
}
+ /**
+ * @param otherAssignments Newest assigmnets.
+ *
+ * @return {@code True} when future compared with other, {@code False} otherwise.
+ */
+ public boolean compatibleWith(GridDhtPreloaderAssignments otherAssignments) {
+ if (isInitial() || ((GridDhtPreloader)grp.preloader()).disableRebalancingCancellationOptimization())
+ return false;
+
+ if (ctx.exchange().lastAffinityChangedTopologyVersion(topVer).equals(
+ ctx.exchange().lastAffinityChangedTopologyVersion(otherAssignments.topologyVersion()))) {
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing is forced on the same topology [grp="
+ + grp.cacheOrGroupName() + ", " + "top=" + topVer + ']');
+
+ return false;
+ }
+
+ if (otherAssignments.affinityReassign()) {
+ if (log.isDebugEnabled())
+ log.debug("Some of owned partitions were reassigned through coordinator [grp="
+ + grp.cacheOrGroupName() + ", " + "init=" + topVer + " ,other=" + otherAssignments.topologyVersion() + ']');
+
+ return false;
+ }
+
+ Set<Integer> p0 = new HashSet<>();
+ Set<Integer> p1 = new HashSet<>();
+
+ // Not compatible if a supplier has left.
+ for (ClusterNode node : rebalancingParts.keySet()) {
+ if (!grp.cacheObjectContext().kernalContext().discovery().alive(node))
+ return false;
+ }
+
+ for (Set<Integer> partitions : rebalancingParts.values())
+ p0.addAll(partitions);
+
+ for (GridDhtPartitionDemandMessage message : otherAssignments.values()) {
+ p1.addAll(message.partitions().fullSet());
+ p1.addAll(message.partitions().historicalSet());
+ }
+
+ // Not compatible if not a subset.
+ if (!p0.containsAll(p1))
+ return false;
+
+ p1 = Stream.concat(grp.affinity().cachedAffinity(otherAssignments.topologyVersion())
+ .primaryPartitions(ctx.localNodeId()).stream(), grp.affinity()
+ .cachedAffinity(otherAssignments.topologyVersion()).backupPartitions(ctx.localNodeId()).stream())
+ .collect(Collectors.toSet());
+
+ NavigableSet<AffinityTopologyVersion> toCheck = grp.affinity().cachedVersions()
+ .headSet(otherAssignments.topologyVersion(), false);
+
+ if (!toCheck.contains(topVer)) {
+ log.warning("History is not enough for checking compatible last rebalance, new rebalance started " +
+ "[grp=" + grp.cacheOrGroupName() + ", lastTop=" + topVer + ']');
+
+ return false;
+ }
+
+ for (AffinityTopologyVersion previousTopVer : toCheck.descendingSet()) {
+ if (previousTopVer.before(topVer))
+ break;
+
+ if (!ctx.exchange().lastAffinityChangedTopologyVersion(previousTopVer).equals(previousTopVer))
+ continue;
+
+ p0 = Stream.concat(grp.affinity().cachedAffinity(previousTopVer).primaryPartitions(ctx.localNodeId()).stream(),
+ grp.affinity().cachedAffinity(previousTopVer).backupPartitions(ctx.localNodeId()).stream())
+ .collect(Collectors.toSet());
+
+ // Not compatible if owners are different.
+ if (!p0.equals(p1))
+ return false;
+ }
+
+ return true;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(RebalanceFuture.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0751cac..50d0677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -377,6 +377,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** Specified only in case of 'cluster is fully rebalanced' state achieved. */
private volatile RebalancedInfo rebalancedInfo;
+ /** Some of owned by affinity partitions were changed state to moving on this exchange. */
+ private volatile boolean affinityReassign;
+
/**
* @param cctx Cache context.
* @param busyLock Busy lock.
@@ -2427,7 +2430,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (changedAffinity())
- cctx.walState().changeLocalStatesOnExchangeDone(res, changedBaseline());
+ cctx.walState().changeLocalStatesOnExchangeDone(res, this);
}
}
catch (Throwable t) {
@@ -3697,8 +3700,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
- && ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions())
+ && ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions()) {
+ markAffinityReassign();
+
assignPartitionsStates();
+ }
}
else {
if (exchCtx.events().hasServerJoin())
@@ -4464,6 +4470,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap()))
cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
+ if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+ DiscoveryCustomMessage discoveryCustomMessage = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
+
+ if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
+ && ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions())
+ markAffinityReassign();
+ }
+
onDone(resTopVer, null);
}
catch (IgniteCheckedException e) {
@@ -5172,6 +5186,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Marks this future as affinity reassign.
+ */
+ public void markAffinityReassign() {
+ affinityReassign = true;
+ }
+
+ /**
+ * @return True if some owned partition was reassigned, false otherwise.
+ */
+ public boolean affinityReassign() {
+ return affinityReassign;
+ }
+
+ /**
* Add or merge updates received from coordinator while exchange in progress.
*
* @param fullMsg Full message with exchangeId = null.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 9282ded..d34bebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -873,7 +873,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
ClusterNode newMapSentBy = discovery.node(updMap.nodeId());
if (newMapSentBy == null)
- return;
+ continue;
if (currentMapSentBy == null || newMapSentBy.order() > currentMapSentBy.order() || updMap.compareTo(currMap) >= 0)
partitions().put(grpId, updMap);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 67ba018..8f0d390 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
@@ -35,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -44,11 +46,11 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
@@ -59,6 +61,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** Default preload resend timeout. */
public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
+ /** Disable rebalancing cancellation optimization. */
+ private final boolean disableRebalancingCancellationOptimization = IgniteSystemProperties.getBoolean(
+ IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION);
+
/** */
private GridDhtPartitionTopology top;
@@ -129,6 +135,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
return new NodeStoppingException("Operation has been cancelled (cache or node is stopping).");
}
+ /**
+ * @return Rebalance cancellation optimization flag.
+ */
+ public boolean disableRebalancingCancellationOptimization() {
+ return disableRebalancingCancellationOptimization;
+ }
+
/** {@inheritDoc} */
@Override public void onInitialExchangeComplete(@Nullable Throwable err) {
if (err == null)
@@ -145,34 +158,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer,
- GridDhtPartitionsExchangeFuture exchFut) {
- if (ctx.kernalContext().clientNode() || rebTopVer.equals(AffinityTopologyVersion.NONE))
+ @Override public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
+ if (ctx.kernalContext().clientNode())
return false; // No-op.
- if (exchFut.resetLostPartitionFor(grp.cacheOrGroupName()))
- return true;
-
- if (exchFut.localJoinExchange())
- return true; // Required, can have outdated updSeq partition counter if node reconnects.
-
- if (!grp.affinity().cachedVersions().contains(rebTopVer)) {
- assert rebTopVer.compareTo(grp.localStartVersion()) <= 0 :
- "Empty history allowed only for newly started cache group [rebTopVer=" + rebTopVer +
- ", localStartTopVer=" + grp.localStartVersion() + ']';
-
- return true; // Required, since no history info available.
- }
-
- final IgniteInternalFuture<Boolean> rebFut = rebalanceFuture();
-
- if (rebFut.isDone() && !rebFut.result())
- return true; // Required, previous rebalance cancelled.
-
AffinityTopologyVersion lastAffChangeTopVer =
ctx.exchange().lastAffinityChangedTopologyVersion(exchFut.topologyVersion());
- return lastAffChangeTopVer.compareTo(rebTopVer) > 0;
+ return lastAffChangeTopVer.equals(exchFut.topologyVersion());
}
/** {@inheritDoc} */
@@ -184,7 +177,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
GridDhtPartitionTopology top = grp.topology();
if (!grp.rebalanceEnabled())
- return new GridDhtPreloaderAssignments(exchId, top.readyTopologyVersion());
+ return new GridDhtPreloaderAssignments(exchId, top.readyTopologyVersion(), false);
int partitions = grp.affinity().partitions();
@@ -195,7 +188,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
", grp=" + grp.name() +
", topVer=" + top.readyTopologyVersion() + ']';
- GridDhtPreloaderAssignments assignments = new GridDhtPreloaderAssignments(exchId, topVer);
+ GridDhtPreloaderAssignments assignments = new GridDhtPreloaderAssignments(exchId, topVer,
+ exchFut != null && exchFut.affinityReassign());
AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
@@ -245,8 +239,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
part.resetUpdateCounter();
}
- assert part.state() == MOVING : "Partition has invalid state for rebalance " + aff.topologyVersion() + " " + part;
-
ClusterNode histSupplier = null;
if (grp.persistenceEnabled() && exchFut != null) {
@@ -274,11 +266,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
msg.partitions().addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partitions);
}
else {
- // If for some reason (for example if supplier fails and new supplier is elected) partition is
- // assigned for full rebalance force clearing if not yet set.
- if (grp.persistenceEnabled() && exchFut != null && !exchFut.isClearingPartition(grp, p))
- part.clearAsync();
-
List<ClusterNode> picked = remoteOwners(p, topVer);
if (picked.isEmpty()) {
@@ -380,14 +367,15 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public Runnable addAssignments(
+ @Override public RebalanceFuture addAssignments(
GridDhtPreloaderAssignments assignments,
boolean forceRebalance,
long rebalanceId,
- Runnable next,
- @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut
+ final RebalanceFuture next,
+ @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
+ GridCompoundFuture<Boolean, Boolean> compatibleRebFut
) {
- return demander.addAssignments(assignments, forceRebalance, rebalanceId, next, forcedRebFut);
+ return demander.addAssignments(assignments, forceRebalance, rebalanceId, next, forcedRebFut, compatibleRebFut);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 8783c73..acb1c52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -39,16 +39,31 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode,
/** */
private boolean cancelled;
+ /** Some of owned by affinity partitions were changed state to moving. */
+ private final boolean affinityReassign;
+
/**
* @param exchangeId Exchange ID.
* @param topVer Last join order.
*/
- public GridDhtPreloaderAssignments(GridDhtPartitionExchangeId exchangeId, AffinityTopologyVersion topVer) {
+ public GridDhtPreloaderAssignments(
+ GridDhtPartitionExchangeId exchangeId,
+ AffinityTopologyVersion topVer,
+ boolean affinityReassign
+ ) {
assert exchangeId != null;
assert topVer.topologyVersion() > 0 : topVer;
this.exchangeId = exchangeId;
this.topVer = topVer;
+ this.affinityReassign = affinityReassign;
+ }
+
+ /**
+ * @return True if partitions were reassigned.
+ */
+ public boolean affinityReassign() {
+ return affinityReassign;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index e2fdf3c..2574c25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -713,9 +713,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
* @return True if current partition map should be overwritten by new partition map, false in other case.
*/
private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) {
- return newMap != null &&
- (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 ||
- newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() > currentMap.updateSequence());
+ return newMap != null && newMap.compareTo(currentMap) > 0;
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 528bd63..a3cd8b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -787,20 +787,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Will not own partition (there are owners to rebalance from) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", owners = " + owners + ']');
}
-
- // It's important to clear non empty moving partitions before full rebalancing.
- // Consider the scenario:
- // Node1 has keys k1 and k2 in the same partition.
- // Node2 started rebalancing from Node1.
- // Node2 received k1, k2 and failed before moving partition to OWNING state.
- // Node1 removes k2 but update has not been delivered to Node1 because of failure.
- // After new full rebalance Node1 will only send k1 to Node2 causing lost removal.
- // NOTE: avoid calling clearAsync for partition twice per topology version.
- if (grp.persistenceEnabled() &&
- exchFut.isClearingPartition(grp, locPart.id()) &&
- !locPart.isClearing() &&
- !locPart.isEmpty())
- locPart.clearAsync();
}
else
updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
@@ -1418,10 +1404,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @return True if current partition map should be overwritten by new partition map, false in other case
*/
private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) {
- return newMap != null &&
- (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 ||
- newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 &&
- newMap.updateSequence() > currentMap.updateSequence());
+ return newMap != null && newMap.compareTo(currentMap) > 0;
}
/** {@inheritDoc} */
@@ -1498,16 +1481,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0) {
- U.warn(log, "Stale version for full partition map update message (will ignore) [" +
- "grp=" + grp.cacheOrGroupName() +
- ", lastTopChange=" + lastTopChangeVer +
- ", readTopVer=" + readyTopVer +
- ", msgVer=" + msgTopVer + ']');
-
- return false;
- }
-
boolean fullMapUpdated = (node2part == null);
if (node2part != null) {
@@ -1519,7 +1492,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (log.isDebugEnabled()) {
log.debug("Overriding partition map in full update map [" +
- "grp=" + grp.cacheOrGroupName() +
+ "node=" + part.nodeId() +
+ ", grp=" + grp.cacheOrGroupName() +
", exchVer=" + exchangeVer +
", curPart=" + mapString(part) +
", newPart=" + mapString(newPart) + ']');
@@ -1529,8 +1503,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
updateSeq.setIfGreater(newPart.updateSequence());
}
else {
- // If for some nodes current partition has a newer map,
- // then we keep the newer value.
+ // If for some nodes current partition has a newer map, then we keep the newer value.
+ if (log.isDebugEnabled()) {
+ log.debug("Partitions map for the node keeps newer value than message [" +
+ "node=" + part.nodeId() +
+ ", grp=" + grp.cacheOrGroupName() +
+ ", exchVer=" + exchangeVer +
+ ", curPart=" + mapString(part) +
+ ", newPart=" + mapString(newPart) + ']');
+ }
+
partMap.put(part.nodeId(), part);
}
}
@@ -2294,7 +2276,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
int part = entry.getKey();
- Set<UUID> newOwners = entry.getValue();
+ Set<UUID> maxCounterPartOwners = entry.getValue();
GridDhtLocalPartition locPart = localPartition(part);
@@ -2302,7 +2284,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
continue;
// Partition state should be mutated only on joining nodes if they are exists for the exchange.
- if (joinedNodes.isEmpty() && !newOwners.contains(locNodeId)) {
+ if (joinedNodes.isEmpty() && !maxCounterPartOwners.contains(locNodeId)) {
rebalancePartition(part, !haveHist.contains(part), exchFut);
res.computeIfAbsent(locNodeId, n -> new HashSet<>()).add(part);
@@ -2312,7 +2294,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
// Then process remote partitions.
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
int part = entry.getKey();
- Set<UUID> newOwners = entry.getValue();
+ Set<UUID> maxCounterPartOwners = entry.getValue();
for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) {
UUID remoteNodeId = remotes.getKey();
@@ -2327,7 +2309,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (state != OWNING)
continue;
- if (!newOwners.contains(remoteNodeId)) {
+ if (!maxCounterPartOwners.contains(remoteNodeId)) {
partMap.put(part, MOVING);
partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());
@@ -2707,12 +2689,19 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
AffinityTopologyVersion lastAffChangeVer = ctx.exchange().lastAffinityChangedTopologyVersion(lastTopChangeVer);
- if (lastAffChangeVer.compareTo(rebFinishedTopVer) > 0 && log.isInfoEnabled())
- log.info("Affinity topology changed, no MOVING partitions will be owned " +
- "[rebFinishedTopVer=" + rebFinishedTopVer +
- ", lastAffChangeVer=" + lastAffChangeVer + "]");
+ if (lastAffChangeVer.compareTo(rebFinishedTopVer) > 0) {
+ if (log.isInfoEnabled()) {
+ log.info("Affinity topology changed, no MOVING partitions will be owned " +
+ "[rebFinishedTopVer=" + rebFinishedTopVer +
+ ", lastAffChangeVer=" + lastAffChangeVer + "]");
+ }
+
+ grp.preloader().forceRebalance();
+
+ return;
+ }
- for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
+ for (GridDhtLocalPartition locPart : currentLocalPartitions()) {
if (locPart.state() == MOVING) {
boolean reserved = locPart.reserve();
@@ -2720,7 +2709,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (reserved && locPart.state() == MOVING &&
lastAffChangeVer.compareTo(rebFinishedTopVer) <= 0 &&
rebFinishedTopVer.compareTo(lastTopChangeVer) <= 0)
- grp.topology().own(locPart);
+ own(locPart);
}
finally {
if (reserved)
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/BreakRebalanceChainTest.java b/modules/core/src/test/java/org/apache/ignite/cache/BreakRebalanceChainTest.java
new file mode 100644
index 0000000..96df899
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/BreakRebalanceChainTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test checks what happens when the rebalance chain is breaking of two parts.
+ */
+public class BreakRebalanceChainTest extends GridCommonAbstractTest {
+ /** Node name suffex. Used for {@link CustomNodeFilter}. */
+ public static final String FILTERED_NODE_SUFFIX = "_filtered";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()))
+ .setCacheConfiguration(
+ new CacheConfiguration(DEFAULT_CACHE_NAME + 1)
+ .setAffinity(new RendezvousAffinityFunction(false, 15))
+ .setNodeFilter(new CustomNodeFilter())
+ .setRebalanceOrder(1)
+ .setBackups(1),
+ new CacheConfiguration(DEFAULT_CACHE_NAME + 2)
+ .setAffinity(new RendezvousAffinityFunction(false, 15))
+ .setRebalanceOrder(2)
+ .setBackups(1),
+ new CacheConfiguration(DEFAULT_CACHE_NAME + 3)
+ .setAffinity(new RendezvousAffinityFunction(false, 15))
+ .setNodeFilter(new CustomNodeFilter())
+ .setRebalanceOrder(3)
+ .setBackups(1));
+ }
+
+ /**
+ * Custom node filter. It filters all node that name contains a {@link BreakRebalanceChainTest.FILTERED_NODE_SUFFIX}.
+ */
+ private static class CustomNodeFilter implements IgnitePredicate<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return !node.consistentId().toString().contains(FILTERED_NODE_SUFFIX);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void test() throws Exception {
+ startGrids(2);
+
+ awaitPartitionMapExchange();
+
+ IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(2)));
+
+ TestRecordingCommunicationSpi communicationSpi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+ ConcurrentHashMap<String, Long> rebalancingCaches = new ConcurrentHashMap<>();
+
+ communicationSpi.blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg;
+
+ long rebId = U.field(demandMsg, "rebalanceId");
+
+ if (demandMsg.groupId() == CU.cacheId(DEFAULT_CACHE_NAME + 1)) {
+ rebalancingCaches.put(DEFAULT_CACHE_NAME + 1, rebId);
+
+ return true;
+ }
+ else if (demandMsg.groupId() == CU.cacheId(DEFAULT_CACHE_NAME + 2)) {
+ rebalancingCaches.put(DEFAULT_CACHE_NAME + 2, rebId);
+
+ return true;
+ }
+ else if (demandMsg.groupId() == CU.cacheId(DEFAULT_CACHE_NAME + 3)) {
+ rebalancingCaches.put(DEFAULT_CACHE_NAME + 3, rebId);
+
+ return true;
+ }
+ }
+
+ return false;
+ });
+
+ startGrid(cfg);
+
+ communicationSpi.waitForBlocked();
+
+ assertEquals("Several parallel rebalace detected.", blockedDemand(rebalancingCaches), 1);
+
+ IgniteEx filteredNode = startGrid(getTestIgniteInstanceName(3) + FILTERED_NODE_SUFFIX);
+
+ IgniteInternalFuture<Boolean>[] futs = getAllRebalanceFutures(filteredNode);
+
+ for (IgniteInternalFuture fut : futs)
+ fut.get(10_000);
+
+ assertEquals("Several parallel rebalace detected.", blockedDemand(rebalancingCaches), 1);
+
+ communicationSpi.stopBlock();
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * @param rebalancingCaches Map of blocked demande messages for caches.
+ * @return Count of blocked messages.
+ */
+ private int blockedDemand(ConcurrentHashMap<String, Long> rebalancingCaches) {
+ int mesages = 0;
+
+ for (Map.Entry<String, Long> entry : rebalancingCaches.entrySet()) {
+ if (entry.getValue() > 0) {
+ mesages++;
+
+ log.info("Demand for partitions on cache " + entry.getKey() + " rebalance id " + entry.getValue());
+ }
+ }
+
+ return mesages;
+ }
+
+ /**
+ * Finds all existed rebalance future by all cache for Ignite's instance specified.
+ *
+ * @param ignite Ignite.
+ * @return Array of rebelance futures.
+ */
+ private IgniteInternalFuture<Boolean>[] getAllRebalanceFutures(IgniteEx ignite) {
+ IgniteInternalFuture<Boolean>[] futs = new IgniteInternalFuture[ignite.cacheNames().size()];
+
+ int i = 0;
+
+ for (String cache : ignite.cacheNames()) {
+ futs[i] = ignite.context().cache().cacheGroup(CU.cacheId(cache)).preloader().rebalanceFuture();
+
+ i++;
+ }
+ return futs;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/NotOptimizedRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/cache/NotOptimizedRebalanceTest.java
new file mode 100644
index 0000000..5bc63d5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/NotOptimizedRebalanceTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
+
+/**
+ * Test checks rebalance behavior when several exchanges trigger sequence.
+ */
+@WithSystemProperty(key = IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION, value = "true")
+public class NotOptimizedRebalanceTest extends GridCommonAbstractTest {
+ /** Start cluster nodes. */
+ public static final int NODES_CNT = 3;
+
+ /** Persistence enabled. */
+ public boolean persistenceEnabled;
+
+ /** Count of backup partitions. */
+ public static final int BACKUPS = 2;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setWalSegmentSize(4 * 1024 * 1024)
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setMaxSize(100L * 1024 * 1024)
+ .setPersistenceEnabled(persistenceEnabled)))
+ .setCacheConfiguration(
+ new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 15))
+ .setBackups(BACKUPS));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Checks rebalance with persistence.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRebalanceWithPersistence() throws Exception {
+ testRebalance(true, true);
+ }
+
+ /**
+ * Checks rebalance without persistence.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceWithoutPersistence() throws Exception {
+ testRebalance(false, true);
+ }
+
+ /**
+ * Checks rebalance with persistence and client joining/lifting.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRebalanceWithPersistenceAndClient() throws Exception {
+ testRebalance(true, false);
+ }
+
+ /**
+ * Checks rebalance without persistence and client joining/lifting..
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceWithoutPersistenceAndClient() throws Exception {
+ testRebalance(false, false);
+ }
+
+ /**
+ * Trigger rebalance when node left topology.
+ *
+ * @param persistence Persistent flag.
+ * @throws Exception If failed.
+ */
+ public void testRebalance(boolean persistence, boolean serverJoin) throws Exception {
+ persistenceEnabled = persistence;
+
+ IgniteEx ignite0 = startGrids(NODES_CNT);
+
+ ignite0.cluster().active(true);
+
+ ignite0.cluster().baselineAutoAdjustEnabled(false);
+
+ IgniteEx newNode = serverJoin ? startGrid(NODES_CNT) : startClientGrid(NODES_CNT);
+
+ grid(1).close();
+
+ for (String cache : ignite0.cacheNames())
+ loadData(ignite0, cache);
+
+ awaitPartitionMapExchange();
+
+ TestRecordingCommunicationSpi commSpi1 = startNodeWithBlockingRebalance(getTestIgniteInstanceName(1));
+
+ commSpi1.waitForBlocked();
+
+ Map<CacheGroupContext, IgniteInternalFuture<Boolean>> futs = getAllRebalanceFuturesByGroup(grid(1));
+
+ checkAllFuturesProcessing(futs);
+
+ for (int i = 0; i < 3; i++) {
+ newNode.close();
+
+ checkTopology(NODES_CNT);
+
+ newNode = serverJoin ? startGrid(NODES_CNT) : startClientGrid(NODES_CNT);
+
+ checkTopology(NODES_CNT + 1);
+ }
+
+ if (serverJoin)
+ checkAllFuturesCancelled(futs);
+ else
+ checkAllFuturesProcessing(futs);
+
+ commSpi1.stopBlock();
+
+ awaitPartitionMapExchange();
+
+ Map<CacheGroupContext, IgniteInternalFuture<Boolean>> newFuts = getAllRebalanceFuturesByGroup(grid(1));
+
+ for (Map.Entry<CacheGroupContext, IgniteInternalFuture<Boolean>> grpFut : futs.entrySet()) {
+ IgniteInternalFuture<Boolean> fut = grpFut.getValue();
+ IgniteInternalFuture<Boolean> newFut = newFuts.get(grpFut.getKey());
+
+ if (serverJoin)
+ assertTrue(futureInfoString(fut), fut.isDone() && !fut.get());
+ else
+ assertSame(fut, newFut);
+
+ assertTrue(futureInfoString(newFut), newFut.isDone() && newFut.get());
+ }
+ }
+
+ /**
+ * @param futs Matching group to rebalance's future.
+ * @throws org.apache.ignite.IgniteCheckedException
+ */
+ public void checkAllFuturesCancelled(Map<CacheGroupContext, IgniteInternalFuture<Boolean>> futs)
+ throws org.apache.ignite.IgniteCheckedException {
+ for (IgniteInternalFuture<Boolean> fut : futs.values())
+ assertTrue(futureInfoString(fut), fut.isDone() && !fut.get());
+ }
+
+ /**
+ * @param futs Matching group to rebalance's future.
+ */
+ public void checkAllFuturesProcessing(Map<CacheGroupContext, IgniteInternalFuture<Boolean>> futs) {
+ for (IgniteInternalFuture<Boolean> fut : futs.values())
+ assertFalse(futureInfoString(fut), fut.isDone());
+ }
+
+ /**
+ * Finds all existed rebalance future by all cache for Ignite's instance specified.
+ *
+ * @param ignite Ignite.
+ * @return Array of rebelance futures.
+ */
+ private Map<CacheGroupContext, IgniteInternalFuture<Boolean>> getAllRebalanceFuturesByGroup(IgniteEx ignite) {
+ HashMap<CacheGroupContext, IgniteInternalFuture<Boolean>> futs = new HashMap<>(ignite.cacheNames().size());
+
+ for (String cache : ignite.cacheNames()) {
+ IgniteInternalFuture<Boolean> fut = ignite.context().cache().cacheGroup(CU.cacheId(cache)).preloader().rebalanceFuture();
+
+ futs.put(ignite.context().cache().cacheGroup(CU.cacheId(cache)), fut);
+ }
+ return futs;
+ }
+
+ /**
+ * Prepares string representation of rebalance future.
+ *
+ * @param rebalanceFuture Rebalance future.
+ * @return Information string about passed future.
+ */
+ private String futureInfoString(IgniteInternalFuture<Boolean> rebalanceFuture) {
+ return "Fut: " + rebalanceFuture
+ + " is done: " + rebalanceFuture.isDone()
+ + " result: " + (rebalanceFuture.isDone() ? rebalanceFuture.result() : "None");
+ }
+
+ /**
+ * Starts node with name <code>name</code> and blocks demand message for custom caches.
+ *
+ * @param name Node instance name.
+ * @return Test communication SPI.
+ * @throws Exception If failed.
+ */
+ private TestRecordingCommunicationSpi startNodeWithBlockingRebalance(String name) throws Exception {
+ IgniteConfiguration cfg = optimize(getConfiguration(name));
+
+ TestRecordingCommunicationSpi communicationSpi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+ communicationSpi.blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage)msg;
+
+ if (CU.cacheId(DEFAULT_CACHE_NAME) != demandMessage.groupId())
+ return false;
+
+ info("Message was caught: " + msg.getClass().getSimpleName()
+ + " rebalanceId = " + U.field(demandMessage, "rebalanceId")
+ + " to: " + node.consistentId()
+ + " by cache id: " + demandMessage.groupId());
+
+ return true;
+ }
+
+ return false;
+ });
+
+ startGrid(cfg);
+
+ return communicationSpi;
+ }
+
+ /**
+ * Loades several data entries to cache specified.
+ *
+ * @param ignite Ignite.
+ * @param cacheName Cache name.
+ */
+ private void loadData(Ignite ignite, String cacheName) {
+ try (IgniteDataStreamer streamer = ignite.dataStreamer(cacheName)) {
+ streamer.allowOverwrite(true);
+
+ for (int i = 0; i < 100; i++)
+ streamer.addData(i, System.nanoTime());
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCancellationTest.java b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCancellationTest.java
new file mode 100644
index 0000000..25b44e0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCancellationTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test cases when rebalance processed and not cancelled during various exchange events.
+ */
+public class RebalanceCancellationTest extends GridCommonAbstractTest {
+ /** Start cluster nodes. */
+ public static final int NODES_CNT = 3;
+
+ /** Count of backup partitions. */
+ public static final int BACKUPS = 2;
+
+ /** In memory data region name. */
+ public static final String MEM_REGION = "mem-region";
+
+ /** In memory cache name. */
+ public static final String MEM_REGOIN_CACHE = DEFAULT_CACHE_NAME + "_mem";
+
+ /** In memory dynamic cache name. */
+ public static final String DYNAMIC_CACHE_NAME = DEFAULT_CACHE_NAME + "_dynamic";
+
+ /** Node name suffex. Used for {@link CustomNodeFilter}. */
+ public static final String FILTERED_NODE_SUFFIX = "_filtered";
+
+ /** Persistence enabled. */
+ public boolean persistenceEnabled;
+
+ /** Add additional non-persistence data region. */
+ public boolean addtiotionalMemRegion;
+
+ /** Filter node. */
+ public boolean filterNode;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(persistenceEnabled)))
+ .setCacheConfiguration(
+ new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 15))
+ .setBackups(BACKUPS));
+
+ if (addtiotionalMemRegion) {
+ cfg.setCacheConfiguration(cfg.getCacheConfiguration()[0],
+ new CacheConfiguration(MEM_REGOIN_CACHE)
+ .setDataRegionName(MEM_REGION)
+ .setBackups(BACKUPS))
+ .getDataStorageConfiguration()
+ .setDataRegionConfigurations(new DataRegionConfiguration()
+ .setName(MEM_REGION));
+ }
+
+ if (filterNode) {
+ for (CacheConfiguration ccfg : cfg.getCacheConfiguration())
+ ccfg.setNodeFilter(new CustomNodeFilter());
+ }
+
+ return cfg;
+ }
+
+ /**
+ * Custom node filter. It filters all node that name contains a {@link FILTERED_NODE_SUFFIX}.
+ */
+ private static class CustomNodeFilter implements IgnitePredicate<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return !node.consistentId().toString().contains(FILTERED_NODE_SUFFIX);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Non baseline node leaves cluster with only persistent caches during rebalance.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceNoneBltNodeLeftOnOnlyPersistenceCluster() throws Exception {
+ testRebalanceNoneBltNode(true, false, false);
+ }
+
+ /**
+ * Non baseline node leaves cluster with only memory caches during rebalance.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceNoneBltNodeLeftOnOnlyInMemoryCluster() throws Exception {
+ testRebalanceNoneBltNode(false, false, false);
+ }
+
+ /**
+ * Non baseline node leaves cluster with persistent and memory caches during rebalance.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceNoneBltNodeLeftOnMixedCluster() throws Exception {
+ testRebalanceNoneBltNode(true, true, false);
+ }
+
+ /**
+ * Non baseline node fails in cluster with only persistent caches during rebalance.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceNoneBltNodeFailedOnOnlyPersistenceCluster() throws Exception {
+ testRebalanceNoneBltNode(true, false, true);
+ }
+
+ /**
+ * Non baseline node fails in cluster with only memory caches during rebalance.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceNoneBltNodeFailedOnOnlyInMemoryCluster() throws Exception {
+ testRebalanceNoneBltNode(false, false, true);
+ }
+
+ /**
+ * Non baseline node fails in cluster with persistent and memory caches during rebalance.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceNoneBltNodeFailedOnMixedCluster() throws Exception {
+ testRebalanceNoneBltNode(true, true, true);
+ }
+
+ /**
+ * Filtered node leaves cluster with persistent region.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceFilteredNodeOnOnlyPersistenceCluster() throws Exception {
+ testRebalanceFilteredNode(true, false);
+ }
+
+ /**
+ * Filtered node leaves cluster with memory region.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceFilteredNodeOnOnlyInMemoryCluster() throws Exception {
+ testRebalanceFilteredNode(false, false);
+ }
+
+ /**
+ * Filtered node leaves cluster with persistent and memory regions.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceFilteredNodeOnMixedCluster() throws Exception {
+ testRebalanceFilteredNode(true, true);
+ }
+
+ /**
+ * Cache stops/starts several times on persistent cluster.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceDynamicCacheOnOnlyPersistenceCluster() throws Exception {
+ testRebalanceDynamicCache(true, false);
+ }
+
+ /**
+ * Cache stop/start several times on memory cluster.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceDynamicCacheOnOnlyInMemoryCluster() throws Exception {
+ testRebalanceDynamicCache(false, false);
+ }
+
+ /**
+ * Cache stop/start several times on cluster with persistent and memory regions.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceDynamicCacheOnMixedCluster() throws Exception {
+ testRebalanceDynamicCache(true, true);
+ }
+
+ /**
+ * Trigger rebalance when dynamic caches stop/start.
+ *
+ * @param persistence Persistent flag.
+ * @param addtiotionalRegion Use additional (non default) region.
+ * @throws Exception If failed.
+ */
+ public void testRebalanceDynamicCache(boolean persistence, boolean addtiotionalRegion) throws Exception {
+ persistenceEnabled = persistence;
+ addtiotionalMemRegion = addtiotionalRegion;
+
+ IgniteEx ignite0 = startGrids(NODES_CNT);
+
+ ignite0.cluster().active(true);
+
+ grid(1).close();
+
+ for (String cache : ignite0.cacheNames())
+ loadData(ignite0, cache);
+
+ awaitPartitionMapExchange();
+
+ TestRecordingCommunicationSpi commSpi1 = startNodeWithBlockingRebalance(getTestIgniteInstanceName(1));
+
+ commSpi1.waitForBlocked();
+
+ IgniteInternalFuture<Boolean>[] futs = getAllRebalanceFutures(ignite0);
+
+ int previousCaches = ignite0.cacheNames().size();
+
+ for (int i = 0; i < 3; i++) {
+ ignite0.createCache(DYNAMIC_CACHE_NAME);
+
+ assertEquals(previousCaches + 1, ignite0.cacheNames().size());
+
+ ignite0.destroyCache(DYNAMIC_CACHE_NAME);
+
+ assertEquals(previousCaches, ignite0.cacheNames().size());
+ }
+
+ for (IgniteInternalFuture<Boolean> fut : futs)
+ assertFalse(futInfoString(fut), fut.isDone());
+
+ commSpi1.stopBlock();
+
+ awaitPartitionMapExchange();
+
+ for (IgniteInternalFuture<Boolean> fut : futs)
+ assertTrue(futInfoString(fut), fut.isDone() && fut.get());
+ }
+
+ /**
+ * Trigger rebalance when non-blt node left topology.
+ *
+ * @param persistence Persistent flag.
+ * @param addtiotionalRegion Use additional (non default) region.
+ * @param fail If true node forcibly falling.
+ * @throws Exception If failed.
+ */
+ public void testRebalanceNoneBltNode(boolean persistence, boolean addtiotionalRegion,
+ boolean fail) throws Exception {
+ persistenceEnabled = persistence;
+ addtiotionalMemRegion = addtiotionalRegion;
+
+ IgniteEx ignite0 = startGrids(NODES_CNT);
+
+ ignite0.cluster().active(true);
+
+ ignite0.cluster().baselineAutoAdjustEnabled(false);
+
+ IgniteEx newNode = startGrid(NODES_CNT);
+
+ grid(1).close();
+
+ for (String cache : ignite0.cacheNames())
+ loadData(ignite0, cache);
+
+ awaitPartitionMapExchange();
+
+ TestRecordingCommunicationSpi commSpi1 = startNodeWithBlockingRebalance(getTestIgniteInstanceName(1));
+
+ commSpi1.waitForBlocked();
+
+ IgniteInternalFuture<Boolean>[] futs = getAllRebalanceFutures(ignite0);
+
+ for (int i = 0; i < 3; i++) {
+ if (fail) {
+ ignite0.configuration().getDiscoverySpi().failNode(newNode.localNode().id(), "Fail node by test.");
+
+ newNode.close();
+ }
+ else
+ newNode.close();
+
+ checkTopology(NODES_CNT);
+
+ newNode = startGrid(NODES_CNT);
+
+ checkTopology(NODES_CNT + 1);
+ }
+
+ for (IgniteInternalFuture<Boolean> fut : futs) {
+ CacheGroupContext grp = U.field(fut, "grp");
+
+ if (CU.isPersistentCache(grp.config(), ignite0.configuration().getDataStorageConfiguration()))
+ assertFalse(futInfoString(fut), fut.isDone());
+ }
+
+ commSpi1.stopBlock();
+
+ awaitPartitionMapExchange();
+
+ for (IgniteInternalFuture<Boolean> fut : futs) {
+ CacheGroupContext grp = U.field(fut, "grp");
+
+ if (CU.isPersistentCache(grp.config(), ignite0.configuration().getDataStorageConfiguration()))
+ assertTrue(futInfoString(fut), fut.isDone() && fut.get());
+ }
+ }
+
+ /**
+ * Trigger rebalance when filtered node left topology.
+ *
+ * @param persistence Persistent flag.
+ * @param addtiotionalRegion Use additional (non default) region.
+ * @throws Exception If failed.
+ */
+ public void testRebalanceFilteredNode(boolean persistence, boolean addtiotionalRegion) throws Exception {
+ persistenceEnabled = persistence;
+ addtiotionalMemRegion = addtiotionalRegion;
+ filterNode = true;
+
+ IgniteEx ignite0 = startGrids(NODES_CNT);
+ IgniteEx filteredNode = startGrid(getTestIgniteInstanceName(NODES_CNT) + FILTERED_NODE_SUFFIX);
+
+ ignite0.cluster().active(true);
+
+ grid(1).close();
+
+ for (String cache : ignite0.cacheNames())
+ loadData(ignite0, cache);
+
+ awaitPartitionMapExchange();
+
+ TestRecordingCommunicationSpi commSpi1 = startNodeWithBlockingRebalance(getTestIgniteInstanceName(1));
+
+ commSpi1.waitForBlocked();
+
+ IgniteInternalFuture<Boolean>[] futs = getAllRebalanceFutures(ignite0);
+
+ for (int k = 0; k < 3; k++) {
+ filteredNode.close();
+
+ checkTopology(NODES_CNT);
+
+ filteredNode = startGrid(getTestIgniteInstanceName(NODES_CNT) + FILTERED_NODE_SUFFIX);
+ }
+
+ for (IgniteInternalFuture<Boolean> fut : futs)
+ assertFalse(futInfoString(fut), fut.isDone());
+
+ commSpi1.stopBlock();
+
+ awaitPartitionMapExchange();
+
+ for (IgniteInternalFuture<Boolean> fut : futs)
+ assertTrue(futInfoString(fut), fut.isDone() && fut.get());
+ }
+
+ /**
+ * Finds all existed rebalance future by all cache for Ignite's instance specified.
+ *
+ * @param ignite Ignite.
+ * @return Array of rebelance futures.
+ */
+ private IgniteInternalFuture<Boolean>[] getAllRebalanceFutures(IgniteEx ignite) {
+ IgniteInternalFuture<Boolean>[] futs = new IgniteInternalFuture[ignite.cacheNames().size()];
+
+ int i = 0;
+
+ for (String cache : ignite.cacheNames()) {
+ futs[i] = grid(1).context().cache()
+ .cacheGroup(CU.cacheId(cache)).preloader().rebalanceFuture();
+
+ assertFalse(futInfoString(futs[i]), futs[i].isDone());
+
+ i++;
+ }
+ return futs;
+ }
+
+ /**
+ * Prepares string representation of rebalance future.
+ *
+ * @param rebalanceFuture Rebalance future.
+ * @return Information string about passed future.
+ */
+ private String futInfoString(IgniteInternalFuture<Boolean> rebalanceFuture) {
+ return "Fut: " + rebalanceFuture
+ + " is done: " + rebalanceFuture.isDone()
+ + " result: " + (rebalanceFuture.isDone() ? rebalanceFuture.result() : "None");
+ }
+
+ /**
+ * Loades several data entries to cache specified.
+ *
+ * @param ignite Ignite.
+ * @param cacheName Cache name.
+ */
+ private void loadData(Ignite ignite, String cacheName) {
+ try (IgniteDataStreamer streamer = ignite.dataStreamer(cacheName)) {
+ streamer.allowOverwrite(true);
+
+ for (int i = 0; i < 100; i++)
+ streamer.addData(i, System.nanoTime());
+ }
+ }
+
+ /**
+ * Starts node with name <code>name</code> and blocks demand message for custom caches.
+ *
+ * @param name Node instance name.
+ * @return Test communication SPI.
+ * @throws Exception If failed.
+ */
+ private TestRecordingCommunicationSpi startNodeWithBlockingRebalance(String name) throws Exception {
+ IgniteConfiguration cfg = optimize(getConfiguration(name));
+
+ TestRecordingCommunicationSpi communicationSpi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+ communicationSpi.blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage)msg;
+
+ if (CU.cacheId(DEFAULT_CACHE_NAME) != demandMessage.groupId()
+ && CU.cacheId(MEM_REGOIN_CACHE) != demandMessage.groupId())
+ return false;
+
+ info("Message was caught: " + msg.getClass().getSimpleName()
+ + " rebalanceId = " + U.field(demandMessage, "rebalanceId")
+ + " to: " + node.consistentId()
+ + " by cache id: " + demandMessage.groupId());
+
+ return true;
+ }
+
+ return false;
+ });
+
+ startGrid(cfg);
+
+ return communicationSpi;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java
new file mode 100644
index 0000000..b67e055
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
+
+/**
+ * Test creates two exchange in same moment, in order to when first executing second would already in queue.
+ */
+@WithSystemProperty(key = IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION, value = "false")
+public class PendingExchangeTest extends GridCommonAbstractTest {
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setBackups(1));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ super.beforeTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * Thats starts several caches in order to affinity history is exhausted.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_AFFINITY_HISTORY_SIZE, value = "2")
+ public void testWithShortAfinityHistory() throws Exception {
+ createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+ GridCompoundFuture compFut = new GridCompoundFuture();
+
+ for (int i = 0; i < 20; i++) {
+ int finalNum = i;
+
+ compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new" + finalNum)));
+ }
+
+ compFut.markInitialized();
+
+ waitForExchnagesBegin(exchangeManager, 20);
+
+ return compFut;
+ });
+ }
+
+ /**
+ * Thats starts one cache and several clients in one moment in order leading to pending client exchanges.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStartSeveralClients() throws Exception {
+ createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+ GridCompoundFuture compFut = new GridCompoundFuture();
+
+ for (int i = 0; i < 5; i++) {
+ int finalNum = i;
+
+ compFut.add(GridTestUtils.runAsync(() -> startClientGrid("new_client" + finalNum)));
+ }
+
+ //Need to explicitly wait for laying of client exchanges on exchange queue, before cache start exchnage.
+ waitForExchnagesBegin(exchangeManager, 5);
+
+ compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new")));
+
+ compFut.markInitialized();
+
+ waitForExchnagesBegin(exchangeManager, 6);
+
+ return compFut;
+ });
+ }
+
+ /**
+ * Test checks that pending exchange will lead to stable topology.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStartCachePending() throws Exception {
+ createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) ->
+ GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new")));
+ }
+
+ /**
+ * Start and stop cache.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStopStartCachePending() throws Exception {
+ createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+ GridCompoundFuture compFut = new GridCompoundFuture();
+
+ compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new")));
+ compFut.add(GridTestUtils.runAsync(() -> ignite.destroyCache(DEFAULT_CACHE_NAME + "_new")));
+
+ compFut.markInitialized();
+
+ waitForExchnagesBegin(exchangeManager, 1);
+
+ return compFut;
+ });
+ }
+
+ /**
+ * Start several cache and stop their.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStopStartSeveralCachePending() throws Exception {
+ createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+ GridCompoundFuture compFut = new GridCompoundFuture();
+
+ for (int i = 0; i < 5; i++) {
+ int finalNum = i;
+
+ compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME +
+ "_new" + finalNum)));
+ compFut.add(GridTestUtils.runAsync(() -> ignite.destroyCache(DEFAULT_CACHE_NAME +
+ "_new" + finalNum)));
+ }
+
+ compFut.markInitialized();
+
+ waitForExchnagesBegin(exchangeManager, 5);
+
+ return compFut;
+ });
+ }
+
+ /**
+ * Start server and cache.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStartServerAndCache() throws Exception {
+ createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+ GridCompoundFuture compFut = new GridCompoundFuture();
+
+ compFut.add(GridTestUtils.runAsync(() -> startGrid("new_srv")));
+ compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new")));
+
+ compFut.markInitialized();
+
+ waitForExchnagesBegin(exchangeManager, 2);
+
+ return compFut;
+ });
+ }
+
+ /**
+ * Start several servers, clients and caches.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStartServalServersWithClisntAndCache() throws Exception {
+ createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+ GridCompoundFuture compFut = new GridCompoundFuture();
+
+ for (int i = 0; i < 3; i++) {
+ int finalSrvNum = i;
+
+ compFut.add(GridTestUtils.runAsync(() -> startGrid("new_srv" + finalSrvNum)));
+ compFut.add(GridTestUtils.runAsync(() -> startClientGrid("new_client" + finalSrvNum)));
+ compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new" + finalSrvNum)));
+ }
+
+ compFut.markInitialized();
+
+ waitForExchnagesBegin(exchangeManager, 9);
+
+ return compFut;
+ });
+ }
+
+ /**
+ * Start and stop several servers and clients.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStartStopServalServersWithClisnt() throws Exception {
+ createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+ GridCompoundFuture compFut = new GridCompoundFuture();
+
+ for (int i = 0; i < 2; i++) {
+ int finalSrvNum = i;
+
+ compFut.add(GridTestUtils.runAsync(() -> startGrid("new_srv" + finalSrvNum)));
+ compFut.add(GridTestUtils.runAsync(() -> startClientGrid("new_client" + finalSrvNum)));
+
+ compFut.add(GridTestUtils.runAsync(() -> stopGrid("new_srv" + finalSrvNum)));
+ compFut.add(GridTestUtils.runAsync(() -> stopGrid("new_client" + finalSrvNum)));
+ }
+
+ compFut.markInitialized();
+
+ waitForExchnagesBegin(exchangeManager, 4);
+
+ return compFut;
+ });
+ }
+
+ /**
+ * Waiting for exchanges beginning.
+ *
+ * @param ignite Ignite.
+ */
+ private void waitForExchnagesBegin(GridCachePartitionExchangeManager exchangeManager, int exchanges) {
+ GridWorker exchWorker = U.field(exchangeManager, "exchWorker");
+ Queue<CachePartitionExchangeWorkerTask> exchnageQueue = U.field(exchWorker, "futQ");
+
+ try {
+ assertTrue(GridTestUtils.waitForCondition(() -> {
+ int exFuts = 0;
+
+ for (CachePartitionExchangeWorkerTask task : exchnageQueue) {
+ if (task instanceof GridDhtPartitionsExchangeFuture)
+ exFuts++;
+ }
+
+ return exFuts >= exchanges;
+ }, 30_000));
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ fail("Can’t wait for the exchnages beginning.");
+ }
+ }
+
+ /**
+ * @param clo Closure triggering exchange.
+ * @throws Exception If failed.
+ */
+ private void createClusterWithPendingExchnageDuringRebalance(PendingExchangeTrigger clo) throws Exception {
+ IgniteEx ignite0 = startGrids(3);
+
+ try (IgniteDataStreamer streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) {
+ for (int i = 0; i < 1000; i++)
+ streamer.addData(i, i);
+ }
+
+ awaitPartitionMapExchange();
+
+ GridCachePartitionExchangeManager exchangeManager1 = ignite(1).context().cache().context().exchange();
+
+ CountDownLatch exchangeLatch = new CountDownLatch(1);
+
+ AffinityTopologyVersion readyTop = exchangeManager1.readyAffinityVersion();
+
+ exchangeManager1.registerExchangeAwareComponent(new PartitionsExchangeAware() {
+ @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ U.awaitQuiet(exchangeLatch);
+ }
+ });
+
+ IgniteInternalFuture startNodeFut = GridTestUtils.runAsync(() -> stopGrid(2));
+
+ assertTrue(GridTestUtils.waitForCondition(() ->
+ exchangeManager1.lastTopologyFuture().initialVersion().after(readyTop), 10_000));
+
+ IgniteInternalFuture exchangeTrigger = clo.trigger(ignite0, exchangeManager1);
+
+ assertTrue(GridTestUtils.waitForCondition(exchangeManager1::hasPendingServerExchange, 10_000));
+
+ exchangeLatch.countDown();
+
+ startNodeFut.get(10_000);
+ exchangeTrigger.get(10_000);
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ * Trigger pending exchange closure interface.
+ */
+ private static interface PendingExchangeTrigger {
+
+ /**
+ * Invoke exchange.
+ *
+ * @param ignite Ignite.
+ * @param exchangeManager Exchnage manager.
+ * @return
+ */
+ IgniteInternalFuture trigger(Ignite ignite, GridCachePartitionExchangeManager exchangeManager);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java
index 80ca5a8..1d703cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java
@@ -341,7 +341,7 @@ public class IgniteCacheConfigurationTemplateTest extends GridCommonAbstractTest
evt = evtLatch.await(3000, TimeUnit.MILLISECONDS);
- assertTrue(evt);
+ assertFalse(evt);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 4427447..3e85257 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -1329,7 +1329,9 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
});
- checkAffinity(cnt, topVer(ord - 1, 1), true);
+ AffinityTopologyVersion currentTop = ignite(0).context().cache().context().exchange().readyAffinityVersion();
+
+ checkAffinity(cnt, currentTop, true);
stopNode(stopId, ord);
@@ -1508,7 +1510,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
for (int i = 0; i < NODES; i++) {
TestRecordingCommunicationSpi spi =
- (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+ (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
@@ -1524,8 +1526,12 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
- while (!joined.get())
- U.sleep(10);
+ for (int j = 1; j < NODES; j++) {
+ TestRecordingCommunicationSpi spi =
+ (TestRecordingCommunicationSpi)ignite(j).configuration().getCommunicationSpi();
+
+ spi.waitForBlocked();
+ }
for (int i = 0; i < NODES; i++)
stopGrid(getTestIgniteInstanceName(i), false, false);
@@ -2147,6 +2153,30 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
for (int i = 0; i < ITERATIONS; i++) {
log.info("Iteration: " + i);
+ TestRecordingCommunicationSpi[] testSpis = new TestRecordingCommunicationSpi[NODES];
+
+ for (int j = 0; j < NODES; j++) {
+ testSpis[j] = new TestRecordingCommunicationSpi();
+
+ testSpis[j].blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+ }
+
+ //Ensure exchanges merge.
+ spiC = igniteInstanceName -> testSpis[getTestIgniteInstanceIndex(igniteInstanceName)];
+
+ GridTestUtils.runAsync(() -> {
+ try {
+ for (int j = 1; j < NODES; j++)
+ testSpis[j].waitForBlocked();
+ }
+ catch (InterruptedException e) {
+ log.error("Thread interrupted.", e);
+ }
+
+ for (TestRecordingCommunicationSpi testSpi : testSpis)
+ testSpi.stopBlock();
+ });
+
startGridsMultiThreaded(NODES);
for (int t = 0; t < NODES; t++)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
index a8c185c..18e4aff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
@@ -75,7 +75,7 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
cfg.setCommunicationSpi(new TestCommunicationSpi());
- cfg.setIncludeEventTypes(EventType.EVTS_ALL);
+ cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED);
return cfg;
}
@@ -156,11 +156,11 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
Ignite ignite1 = startClientGrid(1);
- assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+ assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
ignite1.close();
- assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+ assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
ignite1 = startClientGrid(1);
@@ -176,11 +176,13 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
}
}, EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED);
- assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+ assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
startGrid(2);
- assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+ awaitPartitionMapExchange();
+
+ assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
assertFalse(evtLatch1.await(1000, TimeUnit.MILLISECONDS));
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictPartitionInLogTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictPartitionInLogTest.java
index 15d4eba..9d6c86d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictPartitionInLogTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictPartitionInLogTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.WithSystemProperty;
@@ -61,6 +62,9 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
/** Cache names. */
private static final String[] DEFAULT_CACHE_NAMES = {DEFAULT_CACHE_NAME + "0", DEFAULT_CACHE_NAME + "1"};
+ /** Cache's backups. */
+ public int backups = 0;
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
@@ -88,7 +92,7 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
.map(cacheName ->
new CacheConfiguration<>(cacheName)
.setGroupName(cacheName)
- .setBackups(0)
+ .setBackups(backups)
.setAffinity(new RendezvousAffinityFunction(false, 12))
.setIndexedTypes(Integer.class, Integer.class)
).toArray(CacheConfiguration[]::new)
@@ -96,8 +100,7 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
}
/**
- * Test checks the presence of evicted partitions (RENTING state) in log
- * without duplicate partitions.
+ * Test checks the presence of evicted partitions (RENTING state) in log without duplicate partitions.
*
* @throws Exception If failed.
*/
@@ -128,14 +131,19 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
}
/**
- * Test checks the presence of evicted partitions (MOVING state) in log
- * without duplicate partitions.
+ * Test checks the presence of evicted partitions (MOVING state) in log without duplicate partitions.
*
* @throws Exception If failed.
*/
@Test
public void testEvictPartByMovingState() throws Exception {
- IgniteEx node = startGrid(0);
+ backups = 1;
+
+ IgniteEx node = startGrids(3);
+ awaitPartitionMapExchange();
+
+ stopGrid(2);
+
awaitPartitionMapExchange();
Map<Integer, Collection<Integer>> parseParts = new ConcurrentHashMap<>();
@@ -162,7 +170,12 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
.collect(toList());
parts.subList(0, parts.size() - 1).forEach(GridDhtLocalPartition::clearAsync);
- rebFuts.forEach(rebFut -> rebFut.onDone(Boolean.TRUE));
+
+ rebFuts.forEach(rebFut -> {
+ GridTestUtils.setFieldValue(rebFut, "next", null);
+
+ rebFut.onDone(Boolean.TRUE);
+ });
doSleep(100);
rebFuts.forEach(GridFutureAdapter::reset);
@@ -227,24 +240,27 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
.map(cacheName -> "grpId=" + CU.cacheId(cacheName) + ", grpName=" + cacheName)
.collect(toList());
- Pattern extractParts = Pattern.compile(reason + "=\\[([0-9\\-,]*)]]");
+ Pattern extractParts = Pattern.compile(reason + "=\\[([0-9\\-,]*)]");
Pattern extractGrpId = Pattern.compile("grpId=([0-9]*)");
LogListener.Builder builder = LogListener.matches(logStr -> {
- if (logStr.contains("Partitions have been scheduled for eviction:")) {
- Matcher grpIdMatcher = extractGrpId.matcher(logStr);
- Matcher partsMatcher = extractParts.matcher(logStr);
+ String msgPrefix = "Partitions have been scheduled for eviction:";
+ if (!logStr.contains(msgPrefix))
+ return false;
+
+ of(logStr.replace(msgPrefix, "").split("], \\[")).forEach(s -> {
+
+ Matcher grpIdMatcher = extractGrpId.matcher(s);
+ Matcher partsMatcher = extractParts.matcher(s);
//find and parsing grpId and partitions
- while (grpIdMatcher.find() && partsMatcher.find()) {
+ if (grpIdMatcher.find() && partsMatcher.find()) {
evictParts.computeIfAbsent(parseInt(grpIdMatcher.group(1)), i -> new ConcurrentLinkedQueue<>())
.addAll(parseContentCompactStr(partsMatcher.group(1)));
}
+ });
- return cacheInfos.stream().allMatch(logStr::contains);
- }
- else
- return false;
+ return cacheInfos.stream().allMatch(logStr::contains);
});
return builder.build();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
index 75476e6..77ebd3b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -519,9 +519,6 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
// Await fully exchange complete.
awaitExchange(newIgnite);
- for (Ignite g : G.allGrids())
- g.cache(DEFAULT_CACHE_NAME).rebalance();
-
assertFalse(grpCtx.walEnabled());
// TODO : test with client node as well
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 597ff2b..02bbf6d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -36,7 +36,6 @@ import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -82,6 +81,9 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
/** Block message predicate to set to Communication SPI in node configuration. */
private IgniteBiPredicate<ClusterNode, Message> blockMessagePredicate;
+ /** */
+ private int backups;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); //to make all rebalance wal-based
@@ -93,20 +95,22 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setRebalanceMode(CacheRebalanceMode.ASYNC)
- .setCacheMode(CacheMode.REPLICATED)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setBackups(backups)
.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
cfg.setCacheConfiguration(ccfg);
DataStorageConfiguration dbCfg = new DataStorageConfiguration()
- .setWalHistorySize(Integer.MAX_VALUE)
- .setWalMode(WALMode.LOG_ONLY)
- .setCheckpointFrequency(15 * 60 * 1000)
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration()
- .setPersistenceEnabled(true)
- .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)
- );
+ .setWalSegmentSize(4 * 1024 * 1024)
+ .setWalHistorySize(Integer.MAX_VALUE)
+ .setWalMode(WALMode.LOG_ONLY)
+ .setCheckpointFrequency(15 * 60 * 1000)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)
+ );
cfg.setDataStorageConfiguration(dbCfg);
@@ -153,6 +157,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
*/
@Test
public void testSimple() throws Exception {
+ backups = 4;
+
IgniteEx ig0 = startGrid(0);
IgniteEx ig1 = startGrid(1);
@@ -193,6 +199,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalanceRemoves() throws Exception {
+ backups = 4;
+
IgniteEx ig0 = startGrid(0);
IgniteEx ig1 = startGrid(1);
@@ -241,9 +249,11 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
*/
@Test
public void testWithLocalWalChange() throws Exception {
+ backups = 4;
+
System.setProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, "true");
- IgniteEx crd = (IgniteEx) startGrids(4);
+ IgniteEx crd = startGrids(4);
crd.cluster().active(true);
@@ -260,7 +270,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
stopAllGrids();
- IgniteEx ig0 = (IgniteEx) startGrids(2);
+ IgniteEx ig0 = startGrids(2);
ig0.cluster().active(true);
@@ -331,6 +341,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
*/
@Test
public void testWithGlobalWalChange() throws Exception {
+ backups = 4;
+
// Prepare some data.
IgniteEx crd = (IgniteEx) startGrids(3);
@@ -411,6 +423,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
*/
@Test
public void testRebalanceCancelOnSupplyError() throws Exception {
+ backups = 4;
+
// Prepare some data.
IgniteEx crd = (IgniteEx) startGrids(3);
@@ -459,7 +473,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
final GridCachePreloader preloader = demanderNode.cachex(CACHE_NAME).context().group().preloader();
GridTestUtils.waitForCondition(() ->
- ((GridDhtPartitionDemander.RebalanceFuture) preloader.rebalanceFuture()).topologyVersion().equals(curTopVer),
+ ((GridDhtPartitionDemander.RebalanceFuture)preloader.rebalanceFuture()).topologyVersion().equals(curTopVer),
getTestTimeout()
);
@@ -500,71 +514,6 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
}
/**
- * Check that historical rebalance doesn't start on the cleared partition when some cluster node restarts.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testRebalanceRestartWithNodeBlinking() throws Exception {
- int entryCnt = PARTS_CNT * 200;
-
- // Start 3 nodes cluster:
- // node0 - coordinator (main supplier for historical rebalance)
- // node1 - some node that will generate NODE_LEFT/NODE_JOINED events
- // node2 - historical rebalance demander
- IgniteEx crd = (IgniteEx)startGridsMultiThreaded(3);
-
- crd.cluster().state(ClusterState.ACTIVE);
- crd.cluster().baselineAutoAdjustEnabled(false);
-
- IgniteCache<Integer, String> cache0 = crd.cache(CACHE_NAME);
-
- for (int i = 0; i < entryCnt / 2; i++)
- cache0.put(i, String.valueOf(i));
-
- forceCheckpoint();
-
- stopGrid(2);
-
- for (int i = entryCnt / 2; i < entryCnt; i++)
- cache0.put(i, String.valueOf(i));
-
- blockMessagePredicate = (node, msg) -> {
- if (msg instanceof GridDhtPartitionDemandMessage) {
- GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
-
- return msg0.groupId() == CU.cacheId(CACHE_NAME) && msg0.partitions().size() == PARTS_CNT;
- }
-
- return false;
- };
-
- startGrid(2);
-
- TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(grid(2));
-
- // Wait until node2 starts historical rebalancning.
- spi2.waitForBlocked(1);
-
- // Interruption of rebalancing by NODE_LEFT event, historical supplier should not be provided.
- stopGrid(1);
-
- // Wait until the full rebalance begins.
- spi2.waitForBlocked(2);
-
- // Interrupting it again by NODE_JOINED and get a historical supplier again.
- startGrid(1);
-
- spi2.stopBlock();
-
- awaitPartitionMapExchange();
-
- // Verify data on demander node.
- for (int i = 0; i < entryCnt; i++)
- assertEquals(String.valueOf(i), grid(2).cache(CACHE_NAME).get(i));
- }
-
- /**
*
*/
private static class IndexedObject {
@@ -666,6 +615,72 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
}
/**
+ * Check that historical rebalance doesn't start on the cleared partition when some cluster node restarts.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRebalanceRestartWithNodeBlinking() throws Exception {
+ backups = 2;
+
+ int entryCnt = PARTS_CNT * 200;
+
+ IgniteEx crd = (IgniteEx)startGridsMultiThreaded(3);
+
+ crd.cluster().active(true);
+
+ IgniteCache<Integer, String> cache0 = crd.cache(CACHE_NAME);
+
+ for (int i = 0; i < entryCnt / 2; i++)
+ cache0.put(i, String.valueOf(i));
+
+ forceCheckpoint();
+
+ stopGrid(2);
+
+ for (int i = entryCnt / 2; i < entryCnt; i++)
+ cache0.put(i, String.valueOf(i));
+
+ blockMessagePredicate = (node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+ return msg0.groupId() == CU.cacheId(CACHE_NAME);
+ }
+
+ return false;
+ };
+
+ startGrid(2);
+
+ TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(grid(2));
+
+ // Wait until node2 starts historical rebalancning.
+ spi2.waitForBlocked(1);
+
+ // Interruption of rebalancing by left supplier, should remap to new supplier with full rebalancing.
+ stopGrid(0);
+
+ // Wait until the full rebalance begins with g1 as a supplier.
+ spi2.waitForBlocked(2);
+
+ blockMessagePredicate = null;
+
+ startGrid(0); // Should not force rebalancing remap.
+
+ startGrid(4);
+ resetBaselineTopology(); // Should force rebalancing remap.
+
+ spi2.waitForBlocked(3);
+ spi2.stopBlock();
+
+ awaitPartitionMapExchange();
+
+ // Verify data on demander node.
+ assertPartitionsSame(idleVerify(grid(0), CACHE_NAME));
+ }
+
+ /**
*
*/
static class FailingIOFactory implements FileIOFactory {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
index eba9811..f4c7fc7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -889,7 +889,12 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
for (Ignite grid : G.allGrids()) {
TestRecordingCommunicationSpi.spi(grid)
- .blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+ .blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionsSingleMessage)
+ return ((GridDhtPartitionsSingleMessage)msg).exchangeId() != null;
+
+ return false;
+ });
}
IgniteFuture<Void> fut = grid(1).snapshot().createSnapshot(SNAPSHOT_NAME);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
index a2e63cb..1d94ed1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -181,7 +180,7 @@ public class TxCrossCacheMapOnInvalidTopologyTest extends GridCommonAbstractTest
GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)m;
// Allow full rebalance for cache 1 and system cache.
- if (msg.groupId() == CU.cacheId(CACHE1) || msg.groupId() == CU.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
+ if (msg.groupId() != CU.cacheId(CACHE2))
return false;
// Allow only first batch for cache 2.
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheAffinityVersionTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheAffinityVersionTask.java
new file mode 100644
index 0000000..8eb27c6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheAffinityVersionTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Task retrieves a affinity topology version for cache.
+ */
+public class PlatformCacheAffinityVersionTask extends ComputeTaskAdapter<String, Object> {
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ String arg) throws IgniteException {
+
+ return Collections.singletonMap(new Job(arg), F.find(subgrid, null,
+ (IgnitePredicate<? super ClusterNode>)ClusterNode::isLocal));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
+ return results.get(0).getData();
+ }
+
+ /**
+ * Job.
+ */
+ private static class Job extends ComputeJobAdapter {
+ /** Ignite instance. */
+ @IgniteInstanceResource
+ protected transient Ignite ignite;
+
+ /**
+ * Cache name.
+ */
+ private String cacheName;
+
+ /**
+ * @param cacheName Cache name.
+ */
+ public Job(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ AffinityTopologyVersion topVer = ((IgniteEx)ignite).context().cache().cache(cacheName).context().group()
+ .topology().readyTopologyVersion();
+
+ return new Long[] {topVer.topologyVersion(), (long)topVer.minorTopologyVersion()};
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index bd52959..74e78a5 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -710,7 +710,6 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
GridDhtLocalPartition loc = top.localPartition(p, readyVer, false);
boolean notPrimary = !affNodes.isEmpty() &&
- !exchMgr.rebalanceTopologyVersion().equals(AffinityTopologyVersion.NONE) &&
!affNodes.get(0).equals(dht.context().affinity().primaryByPartition(p, readyVer));
if (affNodesCnt != ownerNodesCnt || !affNodes.containsAll(owners) ||
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 4898008..a1d9902 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.cache.affinity.PendingExchangeTest;
import org.apache.ignite.internal.processors.cache.CacheIgniteOutOfMemoryExceptionTest;
import org.apache.ignite.internal.processors.cache.CacheNoAffinityExchangeTest;
import org.apache.ignite.internal.processors.cache.ClientFastReplyCoordinatorFailureTest;
@@ -90,6 +91,7 @@ public class IgniteCacheTestSuite6 {
GridTestUtils.addTestIfNeeded(suite, IgnitePessimisticTxSuspendResumeTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, PendingExchangeTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, ExchangeMergeStaleServerNodesTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, ClientFastReplyCoordinatorFailureTest.class, ignoredTests);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 77492a0..d824cd0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -20,7 +20,10 @@ package org.apache.ignite.testsuites;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.cache.BreakRebalanceChainTest;
+import org.apache.ignite.cache.NotOptimizedRebalanceTest;
import org.apache.ignite.cache.RebalanceCompleteDuringExchangeTest;
+import org.apache.ignite.cache.RebalanceCancellationTest;
import org.apache.ignite.cache.ResetLostPartitionTest;
import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
@@ -78,6 +81,9 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, IgnitePdsCacheWalDisabledOnRebalancingTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsStartWIthEmptyArchive.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CorruptedTreeFailureHandlingTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, RebalanceCancellationTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, NotOptimizedRebalanceTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, BreakRebalanceChainTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsRestartAfterFailedToWriteMetaPageTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
index 35aff9e..2c672e1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
@@ -102,6 +102,8 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
final IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
+ grid(0).cluster().baselineAutoAdjustEnabled(false);
+
assert cache != null;
for (int i = 0; i < KEY_CNT; i++)
@@ -168,7 +170,7 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
info("Awaiting rebalance events [restartCnt=" + restartCnt.get() + ']');
- boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000);
+ boolean success = lsnr.awaitEvents(countRebalances(GRID_CNT, restartCnt.get()), 15000);
for (int i = 0; i < GRID_CNT; i++)
grid(i).events().stopLocalListen(lsnr, EventType.EVT_CACHE_REBALANCE_STOPPED);
@@ -177,14 +179,22 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
}
/**
+ * This method calculates coutn of Rebalances will be stopped.
+ *
+ * @param nodes Count of nodes into cluster.
+ * @param restarts Count of restarts separete node that was happened.
+ * @return Count of Rebalance events which will be triggered.
+ */
+ protected int countRebalances(int nodes, int restarts) {
+ return nodes * restarts;
+ }
+
+ /**
*
*/
protected IgniteInternalFuture createRestartAction(final AtomicBoolean done, final AtomicInteger restartCnt) throws Exception {
return multithreadedAsync(new Callable<Object>() {
/** */
- private final long nodeLifeTime = 2 * 1000;
-
- /** */
private final int logFreq = 50;
@SuppressWarnings({"BusyWait"})
@@ -194,10 +204,12 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
startGrid(idx);
- Thread.sleep(nodeLifeTime);
+ resetBaselineTopology();
stopGrid(idx);
+ resetBaselineTopology();
+
int c = restartCnt.incrementAndGet();
if (c % logFreq == 0)
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java
index c6e9e82..8acb09f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java
@@ -97,11 +97,12 @@ public class IgniteChangingBaselineCacheQueryNodeRestartSelfTest extends IgniteC
lastOpChangeUp = true;
}
- grid(0).cluster().setBaselineTopology(baselineNodes(grid(0).cluster().forServers().nodes()));
+ resetBaselineTopology();
Thread.sleep(baselineTopChangeInterval);
- int c = restartCnt.incrementAndGet();
+ //Only stopping node triggers Rebalance.
+ int c = lastOpChangeUp ? restartCnt.get() : restartCnt.incrementAndGet();
if (c % logFreq == 0)
info("BaselineTopology changes: " + c);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java
index fc8f5dd..24d2b49 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java
@@ -16,9 +16,13 @@
*/
package org.apache.ignite.internal.processors.database.baseline;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest;
/**
@@ -59,6 +63,36 @@ public class IgniteStableBaselineCacheQueryNodeRestartsSelfTest extends IgniteCa
}
/** {@inheritDoc} */
+ @Override protected IgniteInternalFuture createRestartAction(final AtomicBoolean done, final AtomicInteger restartCnt) throws Exception {
+ return multithreadedAsync(new Callable<Object>() {
+ /** */
+ private final int logFreq = 50;
+
+ @SuppressWarnings({"BusyWait"})
+ @Override public Object call() throws Exception {
+ while (!done.get()) {
+ int idx = gridCount();
+
+ startGrid(idx);
+
+ stopGrid(idx);
+
+ int c = restartCnt.incrementAndGet();
+
+ if (c % logFreq == 0)
+ info("Node restarts: " + c);
+ }
+
+ return true;
+ }
+ }, 1, "restart-thread");
+ }
+
+ @Override protected int countRebalances(int nodes, int restarts) {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids();
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
index a3a8979..1e913db 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
@@ -26,10 +26,10 @@ namespace Apache.Ignite.Core.Tests.Compute
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Compute;
- using Apache.Ignite.Core.Events;
using Apache.Ignite.Core.Impl;
using Apache.Ignite.Core.Resource;
using NUnit.Framework;
@@ -67,12 +67,12 @@ namespace Apache.Ignite.Core.Tests.Compute
_grid1 = Ignition.Start(Configuration(configs.Item1));
_grid2 = Ignition.Start(Configuration(configs.Item2));
- _grid3 = Ignition.Start(Configuration(configs.Item3));
- // Wait for rebalance.
- var events = _grid1.GetEvents();
- events.EnableLocal(EventType.CacheRebalanceStopped);
- events.WaitForLocal(EventType.CacheRebalanceStopped);
+ AffinityTopologyVersion waitingTop = new AffinityTopologyVersion(2, 1);
+
+ Assert.True(_grid1.WaitTopology(waitingTop), "Failed to wait topology " + waitingTop);
+
+ _grid3 = Ignition.Start(Configuration(configs.Item3));
// Start thin client.
_igniteClient = Ignition.StartClient(GetThinClientConfiguration());
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs
index ca98801..556fbf4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Tests
{
using System.Collections.Generic;
using System.Linq;
+ using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Events;
@@ -38,23 +39,41 @@ namespace Apache.Ignite.Core.Tests
[Test]
public void TestRebalanceEvents()
{
+ ICollection<int> cacheRebalanceStopStartEvts = new[]
+ {
+ EventType.CacheRebalanceStarted,
+ EventType.CacheRebalanceStopped
+ };
+
var listener = new Listener<CacheRebalancingEvent>();
- using (Ignition.Start(GetConfig(listener, EventType.CacheRebalanceAll)))
+ using (IIgnite ignite0 = Ignition.Start(GetConfig(listener, cacheRebalanceStopStartEvts, "TestRebalanceEvents")))
{
- var events = listener.GetEvents();
+ var cache = ignite0.GetOrCreateCache<int, int>(CacheName);
- Assert.AreEqual(2, events.Count);
+ for (int i = 0; i < 2000; i++)
+ cache[i] = i;
+
+ using (IIgnite ignite1 = Ignition.Start(GetConfig(listener, cacheRebalanceStopStartEvts)))
+ {
+ AffinityTopologyVersion afterRebalanceTop = new AffinityTopologyVersion(2, 1);
+
+ Assert.True(ignite1.WaitTopology(afterRebalanceTop, CacheName), "Failed to wait topology " + afterRebalanceTop);
+
+ var events = listener.GetEvents();
+
+ Assert.AreEqual(2, events.Count);
- var rebalanceStart = events.First();
+ var rebalanceStart = events.First();
- Assert.AreEqual(CacheName, rebalanceStart.CacheName);
- Assert.AreEqual(EventType.CacheRebalanceStarted, rebalanceStart.Type);
+ Assert.AreEqual(CacheName, rebalanceStart.CacheName);
+ Assert.AreEqual(EventType.CacheRebalanceStarted, rebalanceStart.Type);
- var rebalanceStop = events.Last();
+ var rebalanceStop = events.Last();
- Assert.AreEqual(CacheName, rebalanceStop.CacheName);
- Assert.AreEqual(EventType.CacheRebalanceStopped, rebalanceStop.Type);
+ Assert.AreEqual(CacheName, rebalanceStop.CacheName);
+ Assert.AreEqual(EventType.CacheRebalanceStopped, rebalanceStop.Type);
+ }
}
}
@@ -131,11 +150,13 @@ namespace Apache.Ignite.Core.Tests
/// <summary>
/// Gets the configuration.
/// </summary>
- private static IgniteConfiguration GetConfig<T>(IEventListener<T> listener, ICollection<int> eventTypes)
+ private static IgniteConfiguration GetConfig<T>(IEventListener<T> listener, ICollection<int> eventTypes,
+ string instanceName = null)
where T : IEvent
{
return new IgniteConfiguration(TestUtils.GetTestConfiguration())
{
+ IgniteInstanceName = instanceName,
LocalEventListeners = new[]
{
new LocalEventListener<T>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs
index fffff90..92f3761 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs
@@ -26,6 +26,7 @@ namespace Apache.Ignite.Core.Tests
using System.Reflection;
using System.Threading;
using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Discovery.Tcp;
using Apache.Ignite.Core.Discovery.Tcp.Static;
@@ -47,6 +48,9 @@ namespace Apache.Ignite.Core.Tests
/** */
public const int DfltBusywaitSleepInterval = 200;
+ /** System cache name. */
+ public const string utilityCacheName = "ignite-sys-cache";
+
/** Work dir. */
private static readonly string WorkDir =
// ReSharper disable once AssignNullToNotNullAttribute
@@ -245,6 +249,45 @@ namespace Apache.Ignite.Core.Tests
}
/// <summary>
+ /// Waits for particular topology on specific cache (system cache by default).
+ /// </summary>
+ /// <param name="grid">Grid.</param>
+ /// <param name="waitingTop">Topology version.</param>
+ /// <param name="cacheName">Cache name.</param>
+ /// <param name="timeout">Timeout.</param>
+ /// <returns>
+ /// <c>True</c> if topology took required size.
+ /// </returns>
+ public static bool WaitTopology(this IIgnite grid, AffinityTopologyVersion waitingTop,
+ string cacheName = utilityCacheName, int timeout = 30000)
+ {
+ int checkPeriod = 200;
+
+ // Wait for late affinity.
+ for (var iter = 0;; iter++)
+ {
+ var result = grid.GetCompute().ExecuteJavaTask<long[]>(
+ "org.apache.ignite.platform.PlatformCacheAffinityVersionTask", cacheName);
+ var top = new AffinityTopologyVersion(result[0], (int) result[1]);
+ if (top.CompareTo(waitingTop) >= 0)
+ {
+ Console.Out.WriteLine("Current topology: " + top);
+ break;
+ }
+
+ if (iter % 10 == 0)
+ Console.Out.WriteLine("Waiting topology cur=" + top + " wait=" + waitingTop);
+
+ if (iter * checkPeriod > timeout)
+ return false;
+
+ Thread.Sleep(checkPeriod);
+ }
+
+ return true;
+ }
+
+ /// <summary>
/// Waits for condition, polling in busy wait loop.
/// </summary>
/// <param name="cond">Condition.</param>