You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/07/06 10:14:55 UTC
[ignite] branch master updated: IGNITE-13193 Added fallback to full
rebalance if historical one has failed.
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fde65a6 IGNITE-13193 Added fallback to full rebalance if historical one has failed.
fde65a6 is described below
commit fde65a6bdaa782108ea50b2bb746a9980aa5b680
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Mon Jul 6 13:14:23 2020 +0300
IGNITE-13193 Added fallback to full rebalance if historical one has failed.
---
.../cache/GridCachePartitionExchangeManager.java | 310 +++++------
.../processors/cache/GridCachePreloader.java | 5 +-
.../cache/GridCachePreloaderAdapter.java | 5 +-
.../dht/preloader/GridDhtPartitionDemander.java | 55 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 91 ++--
.../preloader/GridDhtPartitionsExchangeFuture.java | 74 ++-
.../dht/preloader/GridDhtPreloader.java | 27 +-
...java => IgniteHistoricalIteratorException.java} | 33 +-
.../preloader/RebalanceReassignExchangeTask.java | 15 +-
.../cache/persistence/GridCacheOffheapManager.java | 142 ++---
.../IgniteShutdownOnSupplyMessageFailureTest.java | 17 +
.../persistence/db/wal/IgniteWalRebalanceTest.java | 581 ++++++++++++++++++++-
12 files changed, 1050 insertions(+), 305 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 15aad21..fc8bef2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1173,8 +1173,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param exchId Exchange ID.
*/
- public void forceReassign(GridDhtPartitionExchangeId exchId) {
- exchWorker.forceReassign(exchId);
+ public void forceReassign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture fut) {
+ exchWorker.forceReassign(exchId, fut);
}
/**
@@ -2878,9 +2878,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private AffinityTopologyVersion lastFutVer;
- /** Busy flag used as performance optimization to stop current preloading. */
- private volatile boolean busy;
-
/** */
private boolean crd;
@@ -2901,9 +2898,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param exchId Exchange ID.
*/
- void forceReassign(GridDhtPartitionExchangeId exchId) {
- if (!hasPendingExchange() && !busy)
- futQ.add(new RebalanceReassignExchangeTask(exchId));
+ void forceReassign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture fut) {
+ if (!hasPendingExchange())
+ futQ.add(new RebalanceReassignExchangeTask(exchId, fut));
}
/**
@@ -3048,11 +3045,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (!futQ.isEmpty()) {
for (CachePartitionExchangeWorkerTask task : futQ) {
if (task instanceof GridDhtPartitionsExchangeFuture) {
- // First event is enough to check,
- // because only current exchange future can have multiple discovery events (exchange merge).
- ClusterNode triggeredBy = ((GridDhtPartitionsExchangeFuture) task).firstEvent().eventNode();
-
- if (!triggeredBy.isClient())
+ if (((GridDhtPartitionsExchangeFuture)task).changedAffinity())
return true;
}
}
@@ -3173,8 +3166,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
continue;
}
- busy = true;
-
Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
boolean forcePreload = false;
@@ -3185,197 +3176,220 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
AffinityTopologyVersion resVer = null;
- try {
- if (isCancelled())
- break;
+ if (isCancelled())
+ break;
- if (task instanceof RebalanceReassignExchangeTask)
- exchId = ((RebalanceReassignExchangeTask) task).exchangeId();
- else if (task instanceof ForceRebalanceExchangeTask) {
- forcePreload = true;
+ if (task instanceof RebalanceReassignExchangeTask) {
+ RebalanceReassignExchangeTask reassignTask = (RebalanceReassignExchangeTask)task;
- timeout = 0; // Force refresh.
+ exchId = reassignTask.exchangeId();
- exchId = ((ForceRebalanceExchangeTask)task).exchangeId();
- }
- else {
- assert task instanceof GridDhtPartitionsExchangeFuture : task;
+ GridDhtPartitionsExchangeFuture fut = reassignTask.future();
- exchFut = (GridDhtPartitionsExchangeFuture)task;
+ assert fut.changedAffinity() :
+ "Reassignment request started for exchange future which didn't change affinity " +
+ "[exchId=" + exchId + ", fut=" + exchFut + ']';
- exchId = exchFut.exchangeId();
+ if (fut.hasInapplicableNodesForRebalance()) {
+ GridDhtPartitionsExchangeFuture lastFut = lastFinishedFut.get();
- lastInitializedFut = exchFut;
+ AffinityTopologyVersion lastAffChangedVer = cctx.exchange().
+ lastAffinityChangedTopologyVersion(lastFut.topologyVersion());
- boolean newCrd = false;
+ if (fut.topologyVersion().equals(lastAffChangedVer))
+ exchFut = fut;
+ else if (lastAffChangedVer.after(exchId.topologyVersion())) {
+ // There is a new exchange which should trigger rebalancing.
+ // This reassignment request can be skipped.
+ if (log.isInfoEnabled()) {
+ log.info("Partitions reassignment request skipped due to affinity was already changed" +
+ " [reassignTopVer=" + exchId.topologyVersion() +
+ ", lastAffChangedTopVer=" + lastAffChangedVer + ']');
+ }
- if (!crd) {
- List<ClusterNode> srvNodes = exchFut.firstEventCache().serverNodes();
+ continue;
+ }
+ }
+ }
+ else if (task instanceof ForceRebalanceExchangeTask) {
+ forcePreload = true;
- crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal();
- }
+ timeout = 0; // Force refresh.
- if (!exchFut.changedAffinity()) {
- GridDhtPartitionsExchangeFuture lastFut = lastFinishedFut.get();
+ exchId = ((ForceRebalanceExchangeTask)task).exchangeId();
+ }
+ else {
+ assert task instanceof GridDhtPartitionsExchangeFuture : task;
- if (lastFut != null) {
- if (!lastFut.changedAffinity()) {
- // If lastFut corresponds to merged exchange, it is essential to use
- // topologyVersion() instead of initialVersion() - nodes joined in this PME
- // will have DiscoCache only for the last version.
- AffinityTopologyVersion lastAffVer = cctx.exchange()
- .lastAffinityChangedTopologyVersion(lastFut.topologyVersion());
+ exchFut = (GridDhtPartitionsExchangeFuture)task;
- cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(),
- lastAffVer);
- }
- else
- cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(),
- lastFut.topologyVersion());
+ exchId = exchFut.exchangeId();
+
+ lastInitializedFut = exchFut;
+
+ boolean newCrd = false;
+
+ if (!crd) {
+ List<ClusterNode> srvNodes = exchFut.firstEventCache().serverNodes();
+
+ crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal();
+ }
+
+ if (!exchFut.changedAffinity()) {
+ GridDhtPartitionsExchangeFuture lastFut = lastFinishedFut.get();
+
+ if (lastFut != null) {
+ if (!lastFut.changedAffinity()) {
+ // If lastFut corresponds to merged exchange, it is essential to use
+ // topologyVersion() instead of initialVersion() - nodes joined in this PME
+ // will have DiscoCache only for the last version.
+ AffinityTopologyVersion lastAffVer = cctx.exchange()
+ .lastAffinityChangedTopologyVersion(lastFut.topologyVersion());
+
+ cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(),
+ lastAffVer);
}
+ else
+ cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(),
+ lastFut.topologyVersion());
}
+ }
- exchFut.timeBag().finishGlobalStage("Waiting in exchange queue");
+ exchFut.timeBag().finishGlobalStage("Waiting in exchange queue");
- exchFut.init(newCrd);
+ exchFut.init(newCrd);
- int dumpCnt = 0;
+ int dumpCnt = 0;
- long waitStartNanos = System.nanoTime();
+ long waitStartNanos = System.nanoTime();
- // Call rollback logic only for client node, for server nodes
- // rollback logic is in GridDhtPartitionsExchangeFuture.
- boolean txRolledBack = !cctx.localNode().isClient();
+ // Call rollback logic only for client node, for server nodes
+ // rollback logic is in GridDhtPartitionsExchangeFuture.
+ boolean txRolledBack = !cctx.localNode().isClient();
- IgniteConfiguration cfg = cctx.gridConfig();
+ IgniteConfiguration cfg = cctx.gridConfig();
- final long dumpTimeout = 2 * cfg.getNetworkTimeout();
+ final long dumpTimeout = 2 * cfg.getNetworkTimeout();
- long nextDumpTime = 0;
+ long nextDumpTime = 0;
- while (true) {
- // Read txTimeoutOnPME from configuration after every iteration.
- long curTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+ while (true) {
+ // Read txTimeoutOnPME from configuration after every iteration.
+ long curTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+
+ try {
+ long exchTimeout = curTimeout > 0 && !txRolledBack
+ ? Math.min(curTimeout, dumpTimeout)
+ : dumpTimeout;
+
+ blockingSectionBegin();
try {
- long exchTimeout = curTimeout > 0 && !txRolledBack
- ? Math.min(curTimeout, dumpTimeout)
- : dumpTimeout;
+ resVer = exchFut.get(exchTimeout, TimeUnit.MILLISECONDS);
+ } finally {
+ blockingSectionEnd();
+ }
- blockingSectionBegin();
+ onIdle();
+
+ break;
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ updateHeartbeat();
+
+ if (nextDumpTime <= U.currentTimeMillis()) {
+ U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
+ "topVer=" + exchFut.initialVersion() +
+ ", node=" + cctx.localNodeId() + "]. " +
+ (curTimeout <= 0 && !txRolledBack ? "Consider changing " +
+ "TransactionConfiguration.txTimeoutOnPartitionMapExchange" +
+ " to non default value to avoid this message. " : "") +
+ "Dumping pending objects that might be the cause: ");
try {
- resVer = exchFut.get(exchTimeout, TimeUnit.MILLISECONDS);
- } finally {
- blockingSectionEnd();
+ dumpDebugInfo(exchFut);
}
-
- onIdle();
-
- break;
- }
- catch (IgniteFutureTimeoutCheckedException ignored) {
- updateHeartbeat();
-
- if (nextDumpTime <= U.currentTimeMillis()) {
- U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
- "topVer=" + exchFut.initialVersion() +
- ", node=" + cctx.localNodeId() + "]. " +
- (curTimeout <= 0 && !txRolledBack ? "Consider changing " +
- "TransactionConfiguration.txTimeoutOnPartitionMapExchange" +
- " to non default value to avoid this message. " : "") +
- "Dumping pending objects that might be the cause: ");
-
- try {
- dumpDebugInfo(exchFut);
- }
- catch (Exception e) {
- U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
- }
-
- nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout);
+ catch (Exception e) {
+ U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
}
- long passedMillis = U.millisSinceNanos(waitStartNanos);
+ nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout);
+ }
- if (!txRolledBack && curTimeout > 0 && passedMillis >= curTimeout) {
- txRolledBack = true; // Try automatic rollback only once.
+ long passedMillis = U.millisSinceNanos(waitStartNanos);
- cctx.tm().rollbackOnTopologyChange(exchFut.initialVersion());
- }
- }
- catch (Exception e) {
- if (exchFut.reconnectOnError(e))
- throw new IgniteNeedReconnectException(cctx.localNode(), e);
+ if (!txRolledBack && curTimeout > 0 && passedMillis >= curTimeout) {
+ txRolledBack = true; // Try automatic rollback only once.
- throw e;
+ cctx.tm().rollbackOnTopologyChange(exchFut.initialVersion());
}
}
+ catch (Exception e) {
+ if (exchFut.reconnectOnError(e))
+ throw new IgniteNeedReconnectException(cctx.localNode(), e);
- removeMergedFutures(resVer, exchFut);
+ throw e;
+ }
+ }
- if (log.isTraceEnabled())
- log.trace("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
- this + ']');
+ removeMergedFutures(resVer, exchFut);
- if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId()))
- lastRefresh.compareAndSet(-1, U.currentTimeMillis());
+ if (log.isTraceEnabled())
+ log.trace("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
+ this + ']');
- // Just pick first worker to do this, so we don't
- // invoke topology callback more than once for the
- // same event.
+ if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId()))
+ lastRefresh.compareAndSet(-1, U.currentTimeMillis());
- boolean changed = false;
+ // Just pick first worker to do this, so we don't
+ // invoke topology callback more than once for the
+ // same event.
+ boolean changed = false;
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
- continue;
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal())
+ continue;
- changed |= grp.topology().afterExchange(exchFut);
- }
+ changed |= grp.topology().afterExchange(exchFut);
+ }
- if (!cctx.kernalContext().clientNode() && changed) {
- if (log.isDebugEnabled())
- log.debug("Refresh partitions due to mapping was changed");
+ if (!cctx.kernalContext().clientNode() && changed) {
+ if (log.isDebugEnabled())
+ log.debug("Refresh partitions due to mapping was changed");
- refreshPartitions();
- }
+ refreshPartitions();
}
+ }
- if (rebalanceRequired(exchFut)) {
- if (rebalanceDelay > 0)
- U.sleep(rebalanceDelay);
+ if (rebalanceRequired(exchFut)) {
+ if (rebalanceDelay > 0)
+ U.sleep(rebalanceDelay);
- assignsMap = new HashMap<>();
+ assignsMap = new HashMap<>();
- IgniteCacheSnapshotManager snp = cctx.snapshot();
+ IgniteCacheSnapshotManager snp = cctx.snapshot();
- for (final CacheGroupContext grp : cctx.cache().cacheGroups()) {
- long delay = grp.config().getRebalanceDelay();
+ for (final CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ long delay = grp.config().getRebalanceDelay();
- boolean disableRebalance = snp.partitionsAreFrozen(grp);
+ boolean disableRebalance = snp.partitionsAreFrozen(grp);
- GridDhtPreloaderAssignments assigns = null;
+ GridDhtPreloaderAssignments assigns = null;
- // Don't delay for dummy reassigns to avoid infinite recursion.
- if ((delay == 0 || forcePreload) && !disableRebalance)
- assigns = grp.preloader().generateAssignments(exchId, exchFut);
+ // Don't delay for dummy reassigns to avoid infinite recursion.
+ if ((delay == 0 || forcePreload) && !disableRebalance)
+ assigns = grp.preloader().generateAssignments(exchId, exchFut);
- assignsMap.put(grp.groupId(), assigns);
+ assignsMap.put(grp.groupId(), assigns);
- if (resVer == null && !grp.isLocal())
- resVer = grp.topology().readyTopologyVersion();
- }
+ if (resVer == null && !grp.isLocal())
+ resVer = grp.topology().readyTopologyVersion();
}
-
- if (resVer == null)
- resVer = exchId.topologyVersion();
- }
- finally {
- // Must flip busy flag before assignments are given to demand workers.
- busy = false;
}
+ if (resVer == null)
+ resVer = exchId.topologyVersion();
+
if (!F.isEmpty(assignsMap)) {
int size = assignsMap.size();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index c70f86b..3422a16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -76,8 +76,9 @@ public interface GridCachePreloader {
* @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs.
* @return Partition assignments which will be requested from supplier nodes.
*/
- @Nullable public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId,
- @Nullable GridDhtPartitionsExchangeFuture exchFut);
+ @Nullable public GridDhtPreloaderAssignments generateAssignments(
+ GridDhtPartitionExchangeId exchId,
+ @Nullable GridDhtPartitionsExchangeFuture exchFut);
/**
* Adds assignments to preloader.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index b382ba4..a98d192 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -144,8 +144,9 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId,
- GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public GridDhtPreloaderAssignments generateAssignments(
+ GridDhtPartitionExchangeId exchId,
+ GridDhtPartitionsExchangeFuture exchFut) {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 5277085..514dd1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -327,7 +327,7 @@ public class GridDhtPartitionDemander {
return null;
}
- final RebalanceFuture fut = new RebalanceFuture(grp, assignments, log, rebalanceId, next, lastCancelledTime);
+ final RebalanceFuture fut = new RebalanceFuture(grp, lastExchangeFut, assignments, log, rebalanceId, next, lastCancelledTime);
if (!grp.localWalEnabled()) {
fut.listen(new IgniteInClosureX<IgniteInternalFuture<Boolean>>() {
@@ -482,7 +482,7 @@ public class GridDhtPartitionDemander {
if (node == null) {
if (log.isDebugEnabled())
- log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
+ log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
return;
}
@@ -490,13 +490,13 @@ public class GridDhtPartitionDemander {
// Topology already changed (for the future that supply message based on).
if (!fut.isActual(supplyMsg.rebalanceId())) {
if (log.isDebugEnabled())
- log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
+ log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
return;
}
if (log.isDebugEnabled())
- log.debug("Received supply message [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
+ log.debug("Received supply message [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
// Check whether there were error during supply message unmarshalling process.
if (supplyMsg.classError() != null) {
@@ -616,7 +616,7 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (state is not MOVING): " +
- "[" + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + "]");
+ '[' + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + ']');
}
}
else {
@@ -624,7 +624,7 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (affinity changed): " +
- "[" + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + "]");
+ '[' + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + ']');
}
}
@@ -662,7 +662,7 @@ public class GridDhtPartitionDemander {
else {
if (log.isDebugEnabled())
log.debug("Will not request next demand message [" + demandRoutineInfo(nodeId, supplyMsg) +
- ", rebalanceFuture=" + fut + "]");
+ ", rebalanceFuture=" + fut + ']');
}
}
catch (IgniteSpiException | IgniteCheckedException e) {
@@ -985,14 +985,14 @@ public class GridDhtPartitionDemander {
* Internal states of rebalance future.
*/
private enum RebalanceFutureState {
- /** Init. */
+ /** Initial state. */
INIT,
- /** Started. */
+ /** Rebalance future started and requested required partitions. */
STARTED,
- /** Marked as cancelled. */
- MARK_CANCELLED,
+ /** Marked as cancelled. This means partitions will not be requested. */
+ MARK_CANCELLED
}
/**
@@ -1018,13 +1018,17 @@ public class GridDhtPartitionDemander {
/** Remaining. */
private final Map<UUID, IgniteDhtDemandedPartitionsMap> remaining = new HashMap<>();
- /** Missed. */
+ /** Collection of missed partitions and partitions that could not be rebalanced from a supplier. */
private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
/** Exchange ID. */
@GridToStringExclude
private final GridDhtPartitionExchangeId exchId;
+ /** Coresponding exchange future. */
+ @GridToStringExclude
+ private final GridDhtPartitionsExchangeFuture exchFut;
+
/** Topology version. */
private final AffinityTopologyVersion topVer;
@@ -1076,7 +1080,10 @@ public class GridDhtPartitionDemander {
private final Map<ClusterNode, Set<Integer>> rebalancingParts;
/**
- * @param grp Cache group.
+ * Creates a new rebalance future.
+ *
+ * @param grp Cache group context.
+ * @param exchFut Exchange future.
* @param assignments Assignments.
* @param log Logger.
* @param rebalanceId Rebalance id.
@@ -1085,17 +1092,21 @@ public class GridDhtPartitionDemander {
*/
RebalanceFuture(
CacheGroupContext grp,
+ GridDhtPartitionsExchangeFuture exchFut,
GridDhtPreloaderAssignments assignments,
IgniteLogger log,
long rebalanceId,
RebalanceFuture next,
- AtomicLong lastCancelledTime) {
+ AtomicLong lastCancelledTime
+ ) {
assert assignments != null;
+ assert assignments != null : "Asiignments must not be null.";
this.rebalancingParts = U.newHashMap(assignments.size());
this.assignments = assignments;
exchId = assignments.exchangeId();
topVer = assignments.topologyVersion();
+ this.exchFut = exchFut;
this.next = next;
this.lastCancelledTime = lastCancelledTime;
@@ -1142,6 +1153,7 @@ public class GridDhtPartitionDemander {
this.assignments = null;
this.exchId = null;
this.topVer = null;
+ this.exchFut = null;
this.ctx = null;
this.grp = null;
this.log = null;
@@ -1476,6 +1488,19 @@ public class GridDhtPartitionDemander {
if (isDone())
return;
+ IgniteDhtDemandedPartitionsMap parts = remaining.get(nodeId);
+
+ assert parts != null : "Remaining not found [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
+ ", part=" + p + "]";
+
+ if (parts.historicalMap().contains(p)) {
+ // The partition p cannot be wal rebalanced,
+ // let's exclude the given nodeId and give a try to full rebalance.
+ exchFut.markNodeAsInapplicableForHistoricalRebalance(nodeId);
+ }
+ else
+ exchFut.markNodeAsInapplicableForFullRebalance(nodeId, grp.groupId(), p);
+
missed.computeIfAbsent(nodeId, k -> new HashSet<>());
missed.get(nodeId).add(p);
@@ -1611,7 +1636,7 @@ public class GridDhtPartitionDemander {
onDone(false); // Finished but has missed partitions, will force dummy exchange
- ctx.exchange().forceReassign(exchId);
+ ctx.exchange().forceReassign(exchId, exchFut);
return;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 8dfb7c2..d054b05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -18,10 +18,12 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -47,7 +49,9 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -214,6 +218,15 @@ class GridDhtPartitionSupplier {
SupplyContext sctx = null;
+ Set<Integer> remainingParts = null;
+
+ GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage(
+ demandMsg.rebalanceId(),
+ grp.groupId(),
+ demandMsg.topologyVersion(),
+ grp.deploymentEnabled()
+ );
+
try {
synchronized (scMap) {
sctx = scMap.remove(contextId);
@@ -257,15 +270,6 @@ class GridDhtPartitionSupplier {
else
maxBatchesCnt = 1;
- GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage(
- demandMsg.rebalanceId(),
- grp.groupId(),
- demandMsg.topologyVersion(),
- grp.deploymentEnabled()
- );
-
- Set<Integer> remainingParts;
-
if (sctx == null || sctx.iterator == null) {
iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion());
@@ -455,42 +459,65 @@ class GridDhtPartitionSupplier {
}
else
U.error(log, "Failed to continue supplying ["
- + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t);
+ + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t);
try {
if (sctx != null)
clearContext(sctx, log);
- else if (iter != null)
- iter.close();
}
catch (Throwable t1) {
U.error(log, "Failed to cleanup supplying context ["
- + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1);
+ + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
}
if (!sendErrMsg)
return;
+ boolean fallbackToFullRebalance = X.hasCause(t, IgniteHistoricalIteratorException.class);
+
try {
- GridDhtPartitionSupplyMessageV2 errMsg = new GridDhtPartitionSupplyMessageV2(
- demandMsg.rebalanceId(),
- grp.groupId(),
- demandMsg.topologyVersion(),
- grp.deploymentEnabled(),
- t
- );
+ GridDhtPartitionSupplyMessage errMsg;
+
+ if (fallbackToFullRebalance) {
+ // Mark the last checkpoint as not applicable for WAL rebalance.
+ grp.shared().database().lastCheckpointInapplicableForWalRebalance(grp.groupId());
+
+ // Mark all remaining partitions as missed to trigger full rebalance.
+ if (iter == null && F.isEmpty(remainingParts)) {
+ remainingParts = new HashSet<>(demandMsg.partitions().fullSet());
+ remainingParts.addAll(demandMsg.partitions().historicalSet());
+ }
+
+ for (int p : Optional.ofNullable(remainingParts).orElseGet(Collections::emptySet))
+ supplyMsg.missed(p);
+
+ errMsg = supplyMsg;
+ }
+ else {
+ errMsg = new GridDhtPartitionSupplyMessageV2(
+ demandMsg.rebalanceId(),
+ grp.groupId(),
+ demandMsg.topologyVersion(),
+ grp.deploymentEnabled(),
+ t
+ );
+ }
reply(topicId, demanderNode, demandMsg, errMsg, contextId);
}
catch (Throwable t1) {
U.error(log, "Failed to send supply error message ["
- + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1);
+ + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
}
- grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR,
- new IgniteCheckedException("Failed to continue supplying ["
- + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t)
- ));
+ // If fallback to full rebalance is possible then let's try to switch to it
+ // instead of triggering failure handler.
+ if (!fallbackToFullRebalance) {
+ grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR,
+ new IgniteCheckedException("Failed to continue supplying ["
+ + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t)
+ ));
+ }
}
}
@@ -537,7 +564,7 @@ class GridDhtPartitionSupplier {
* @param demander Recipient of supply message.
* @param demandMsg Demand message.
* @param supplyMsg Supply message.
- * @param contextId Supply context id.
+ * @param ctxId Supply context id.
* @return {@code True} if message was sent, {@code false} if recipient left grid.
* @throws IgniteCheckedException If failed.
*/
@@ -546,7 +573,7 @@ class GridDhtPartitionSupplier {
ClusterNode demander,
GridDhtPartitionDemandMessage demandMsg,
GridDhtPartitionSupplyMessage supplyMsg,
- T3<UUID, Integer, AffinityTopologyVersion> contextId
+ T3<UUID, Integer, AffinityTopologyVersion> ctxId
) throws IgniteCheckedException {
try {
if (log.isDebugEnabled())
@@ -567,7 +594,7 @@ class GridDhtPartitionSupplier {
log.debug("Failed to send supply message (demander left): [" + supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]");
synchronized (scMap) {
- clearContext(scMap.remove(contextId), log);
+ clearContext(scMap.remove(ctxId), log);
}
return false;
@@ -588,21 +615,21 @@ class GridDhtPartitionSupplier {
/**
* Saves supply context with given parameters to {@code scMap}.
*
- * @param contextId Supply context id.
+ * @param ctxId Supply context id.
* @param entryIt Entries rebalance iterator.
* @param remainingParts Set of partitions that weren't sent yet.
* @param rebalanceId Rebalance id.
*/
private void saveSupplyContext(
- T3<UUID, Integer, AffinityTopologyVersion> contextId,
+ T3<UUID, Integer, AffinityTopologyVersion> ctxId,
IgniteRebalanceIterator entryIt,
Set<Integer> remainingParts,
long rebalanceId
) {
synchronized (scMap) {
- assert scMap.get(contextId) == null;
+ assert scMap.get(ctxId) == null;
- scMap.put(contextId, new SupplyContext(entryIt, remainingParts, rebalanceId));
+ scMap.put(ctxId, new SupplyContext(entryIt, remainingParts, rebalanceId));
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c940813..a34a098 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -313,6 +313,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
@GridToStringExclude
private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
+ /** Set of nodes that cannot be used for wal rebalancing due to some reason. */
+ private Set<UUID> exclusionsFromHistoricalRebalance = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ /**
+ * Set of nodes that cannot be used for full rebalancing due missed partitions.
+ * Mapping pair of groupId and nodeId to set of partitions.
+ */
+ private Map<T2<Integer, UUID>, Set<Integer>> exclusionsFromFullRebalance = new ConcurrentHashMap<>();
+
/** Reserved max available history for calculation of history supplier on coordinator. */
private volatile Map<Integer /** Group. */, Map<Integer /** Partition */, Long /** Counter. */>> partHistReserved;
@@ -537,7 +546,68 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @return List of IDs of history supplier nodes or empty list if these doesn't exist.
*/
@Nullable public List<UUID> partitionHistorySupplier(int grpId, int partId, long cntrSince) {
- return partHistSuppliers.getSupplier(grpId, partId, cntrSince);
+ List<UUID> histSuppliers = partHistSuppliers.getSupplier(grpId, partId, cntrSince);
+
+ return histSuppliers.stream().filter((supplier) -> !exclusionsFromHistoricalRebalance.contains(supplier))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Marks the given node as not applicable for historical rebalancing.
+ *
+ * @param nodeId Node id that should not be used for wal rebalancing (aka historical supplier).
+ */
+ public void markNodeAsInapplicableForHistoricalRebalance(UUID nodeId) {
+ exclusionsFromHistoricalRebalance.add(nodeId);
+ }
+
+ /**
+ * Marks the given node as not applicable for full rebalancing
+ * for the given group and partition.
+ *
+ * @param nodeId Node id that should not be used for full rebalancing.
+ * @param grpId Cache group id.
+ * @param p Partition id.
+ */
+ public void markNodeAsInapplicableForFullRebalance(UUID nodeId, int grpId, int p) {
+ Set<Integer> parts = exclusionsFromFullRebalance.computeIfAbsent(new T2<>(grpId, nodeId), t2 ->
+ Collections.newSetFromMap(new ConcurrentHashMap<>())
+ );
+
+ parts.add(p);
+ }
+
+ /**
+ * @return {@code true} if there are nodes which are inapplicable for historical rebalancing.
+ */
+ public boolean hasInapplicableNodesForHistoricalRebalance() {
+ return !exclusionsFromHistoricalRebalance.isEmpty();
+ }
+
+ /**
+ * @return {@code true} if there are nodes which are inapplicable for full rebalancing.
+ */
+ public boolean hasInapplicableNodesForFullRebalance() {
+ return !exclusionsFromFullRebalance.isEmpty();
+ }
+
+ /**
+ * @return {@code true} if there are nodes which are inapplicable for rebalancing.
+ */
+ public boolean hasInapplicableNodesForRebalance() {
+ return hasInapplicableNodesForHistoricalRebalance() || hasInapplicableNodesForFullRebalance();
+ }
+
+ /**
+ * @param nodeId Node id to check.
+ * @param grpId Cache group id.
+ * @param p Partition id.
+ * @return {@code true} if the node is applicable for full rebalancing.
+ */
+ public boolean isNodeApplicableForFullRebalance(UUID nodeId, int grpId, int p) {
+ return Optional.ofNullable(exclusionsFromFullRebalance.get(new T2<>(grpId, nodeId)))
+ .map(s -> !s.contains(p))
+ .orElse(true);
}
/**
@@ -2696,6 +2766,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
newCrdFut = null;
exchangeLocE = null;
exchangeGlobalExceptions.clear();
+ exclusionsFromHistoricalRebalance.clear();
+ exclusionsFromFullRebalance.clear();
if (finishState != null)
finishState.cleanUp();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 23ff455..bdaae9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
@@ -186,7 +187,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
AffinityTopologyVersion topVer = top.readyTopologyVersion();
- assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) :
+ assert exchFut == null ||
+ exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) ||
+ exchFut.context().events().topologyVersion().equals(ctx.exchange().lastAffinityChangedTopologyVersion(top.readyTopologyVersion())) :
"Topology version mismatch [exchId=" + exchId +
", grp=" + grp.name() +
", topVer=" + top.readyTopologyVersion() + ']';
@@ -275,7 +278,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partitions);
}
else {
- List<ClusterNode> picked = remoteOwners(p, topVer);
+ int partId = p;
+ List<ClusterNode> picked = remoteOwners(p, topVer, node -> {
+ if (exchFut != null && !exchFut.isNodeApplicableForFullRebalance(node.id(), grp.groupId(), partId))
+ return false;
+
+ return true;
+ });
if (!picked.isEmpty()) {
ClusterNode n = picked.get(p % picked.size());
@@ -319,12 +328,24 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @return Nodes owning this partition.
*/
private List<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+ return remoteOwners(p, topVer, node -> true);
+ }
+
+ /**
+ * Returns remote owners (excluding local node) for specified partition {@code p}
+ * which is additionally filtered by the specified predicate.
+ *
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Nodes owning this partition.
+ */
+ private List<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer, IgnitePredicate<ClusterNode> pred) {
List<ClusterNode> owners = grp.topology().owners(p, topVer);
List<ClusterNode> res = new ArrayList<>(owners.size());
for (ClusterNode owner : owners) {
- if (!owner.id().equals(ctx.localNodeId()))
+ if (!owner.id().equals(ctx.localNodeId()) && pred.apply(owner))
res.add(owner);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIteratorException.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIteratorException.java
index 7e473be..5b641b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIteratorException.java
@@ -13,37 +13,36 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.IgniteException;
/**
- *
+ * Thrown when {@link IgniteHistoricalIterator} cannot iterate over WAL for some reason.
*/
-public class RebalanceReassignExchangeTask implements CachePartitionExchangeWorkerTask {
+public class IgniteHistoricalIteratorException extends IgniteException {
/** */
- private final GridDhtPartitionExchangeId exchId;
+ private static final long serialVersionUID = 0L;
/**
- * @param exchId Exchange ID.
+ * Creates a new exception with the specified cause.
+ *
+ * @param cause Cause.
*/
- public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId) {
- assert exchId != null;
-
- this.exchId = exchId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean skipForExchangeMerge() {
- return true;
+ public IgniteHistoricalIteratorException(Throwable cause) {
+ super(cause);
}
/**
- * @return Exchange ID.
+ * Creates a new exception with the specified message and cause.
+ *
+ * @param msg Detail message.
+ * @param cause Cause.
*/
- public GridDhtPartitionExchangeId exchangeId() {
- return exchId;
+ public IgniteHistoricalIteratorException(String msg, Throwable cause) {
+ super(msg, cause);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
index 7e473be..5cffcb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
@@ -26,13 +26,19 @@ public class RebalanceReassignExchangeTask implements CachePartitionExchangeWork
/** */
private final GridDhtPartitionExchangeId exchId;
+ /** */
+ private final GridDhtPartitionsExchangeFuture exchFut;
+
/**
* @param exchId Exchange ID.
+ * @param exchFut Exchange future.
*/
- public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId) {
+ public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) {
assert exchId != null;
+ assert exchFut != null;
this.exchId = exchId;
+ this.exchFut = exchFut;
}
/** {@inheritDoc} */
@@ -46,4 +52,11 @@ public class RebalanceReassignExchangeTask implements CachePartitionExchangeWork
public GridDhtPartitionExchangeId exchangeId() {
return exchId;
}
+
+ /**
+ * @return Exchange future.
+ */
+ public GridDhtPartitionsExchangeFuture future() {
+ return exchFut;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 4f325a2..aa0520c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
@@ -105,6 +106,7 @@ import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.lang.IgnitePredicateX;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -1002,7 +1004,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override @Nullable protected IgniteHistoricalIterator historicalIterator(
- CachePartitionPartialCountersMap partCntrs, Set<Integer> missing) throws IgniteCheckedException {
+ CachePartitionPartialCountersMap partCntrs,
+ Set<Integer> missing
+ ) throws IgniteCheckedException {
if (partCntrs == null || partCntrs.isEmpty())
return null;
@@ -1022,14 +1026,22 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
FileWALPointer minPtr = (FileWALPointer)database.checkpointHistory().searchEarliestWalPointer(grp.groupId(), partsCounters);
- WALIterator it = grp.shared().wal().replay(minPtr);
+ try {
+ WALIterator it = grp.shared().wal().replay(minPtr);
+
+ WALHistoricalIterator histIt = new WALHistoricalIterator(log, grp, partCntrs, it);
- WALHistoricalIterator iterator = new WALHistoricalIterator(log, grp, partCntrs, it);
+ // Add historical partitions which are unabled to reserve to missing set.
+ missing.addAll(histIt.missingParts);
- // Add historical partitions which are unabled to reserve to missing set.
- missing.addAll(iterator.missingParts);
+ return histIt;
+ }
+ catch (Exception ex) {
+ if (!X.hasCause(ex, IgniteHistoricalIteratorException.class))
+ throw new IgniteHistoricalIteratorException(ex);
- return iterator;
+ throw ex;
+ }
}
/** {@inheritDoc} */
@@ -1349,92 +1361,98 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
*
*/
private void advance() {
- next = null;
+ try {
+ next = null;
- outer: while (doneParts.size() != partMap.size()) {
- if (entryIt != null) {
- while (entryIt.hasNext()) {
- DataEntry entry = entryIt.next();
+ outer:
+ while (doneParts.size() != partMap.size()) {
+ if (entryIt != null) {
+ while (entryIt.hasNext()) {
+ DataEntry entry = entryIt.next();
- if (cacheIds.contains(entry.cacheId())) {
- int idx = partMap.partitionIndex(entry.partitionId());
+ if (cacheIds.contains(entry.cacheId())) {
+ int idx = partMap.partitionIndex(entry.partitionId());
- if (idx < 0 || missingParts.contains(idx))
- continue;
+ if (idx < 0 || missingParts.contains(idx))
+ continue;
- long from = partMap.initialUpdateCounterAt(idx);
- long to = partMap.updateCounterAt(idx);
+ long from = partMap.initialUpdateCounterAt(idx);
+ long to = partMap.updateCounterAt(idx);
- if (entry.partitionCounter() > from && entry.partitionCounter() <= to) {
- // Partition will be marked as done for current entry on next iteration.
- if (++rebalancedCntrs[idx] == to)
- donePart = entry.partitionId();
+ if (entry.partitionCounter() > from && entry.partitionCounter() <= to) {
+ // Partition will be marked as done for current entry on next iteration.
+ if (++rebalancedCntrs[idx] == to)
+ donePart = entry.partitionId();
- next = entry;
+ next = entry;
- return;
+ return;
+ }
}
}
}
- }
- entryIt = null;
+ entryIt = null;
- // Search for next DataEntry while applying rollback counters.
- while (walIt.hasNext()) {
- IgniteBiTuple<WALPointer, WALRecord> rec = walIt.next();
+ // Search for next DataEntry while applying rollback counters.
+ while (walIt.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> rec = walIt.next();
- if (rec.get2() instanceof DataRecord) {
- DataRecord data = (DataRecord)rec.get2();
+ if (rec.get2() instanceof DataRecord) {
+ DataRecord data = (DataRecord)rec.get2();
- entryIt = data.writeEntries().iterator();
+ entryIt = data.writeEntries().iterator();
- // Move on to the next valid data entry.
- continue outer;
- }
- else if (rec.get2() instanceof RollbackRecord) {
- RollbackRecord rbRec = (RollbackRecord)rec.get2();
+ // Move on to the next valid data entry.
+ continue outer;
+ }
+ else if (rec.get2() instanceof RollbackRecord) {
+ RollbackRecord rbRec = (RollbackRecord)rec.get2();
- if (grp.groupId() == rbRec.groupId()) {
- int idx = partMap.partitionIndex(rbRec.partitionId());
+ if (grp.groupId() == rbRec.groupId()) {
+ int idx = partMap.partitionIndex(rbRec.partitionId());
- if (idx < 0 || missingParts.contains(idx))
- continue;
+ if (idx < 0 || missingParts.contains(idx))
+ continue;
- long from = partMap.initialUpdateCounterAt(idx);
- long to = partMap.updateCounterAt(idx);
+ long from = partMap.initialUpdateCounterAt(idx);
+ long to = partMap.updateCounterAt(idx);
- rebalancedCntrs[idx] += rbRec.overlap(from, to);
+ rebalancedCntrs[idx] += rbRec.overlap(from, to);
- if (rebalancedCntrs[idx] == partMap.updateCounterAt(idx)) {
- if (log.isDebugEnabled()) {
- log.debug("Partition done [grpId=" + grp.groupId() +
- ", partId=" + donePart +
- ", from=" + from +
- ", to=" + to + ']');
- }
+ if (rebalancedCntrs[idx] == partMap.updateCounterAt(idx)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Partition done [grpId=" + grp.groupId() +
+ ", partId=" + donePart +
+ ", from=" + from +
+ ", to=" + to + ']');
+ }
- doneParts.add(rbRec.partitionId()); // Add to done set immediately.
+ doneParts.add(rbRec.partitionId()); // Add to done set immediately.
+ }
}
}
}
- }
- if (entryIt == null && doneParts.size() != partMap.size()) {
- for (int i = 0; i < partMap.size(); i++) {
- int p = partMap.partitionAt(i);
+ if (entryIt == null && doneParts.size() != partMap.size()) {
+ for (int i = 0; i < partMap.size(); i++) {
+ int p = partMap.partitionAt(i);
- if (!doneParts.contains(p)) {
- log.warning("Some partition entries were missed during historical rebalance [grp=" + grp + ", part=" + p + ", missed=" +
- (partMap.updateCounterAt(i) - rebalancedCntrs[i]) + ']');
+ if (!doneParts.contains(p)) {
+ log.warning("Some partition entries were missed during historical rebalance [grp=" + grp + ", part=" + p + ", missed=" +
+ (partMap.updateCounterAt(i) - rebalancedCntrs[i]) + ']');
- doneParts.add(p);
- }
- }
+ doneParts.add(p);
+ }
+ }
- return;
+ return;
+ }
}
}
+ catch (Exception ex) {
+ throw new IgniteHistoricalIteratorException(ex);
+ }
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java
index 2bc7fc7..2452662 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
@@ -44,10 +45,13 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecora
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED;
+
/** */
public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstractTest {
/** Rebalance cache name. */
@@ -84,6 +88,8 @@ public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstract
if (name.equals(getTestIgniteInstanceName(NODE_NAME_WITH_TEST_FILE_FACTORY))) {
conf.setFileIOFactory(new FailingFileIOFactory(canFailFirstNode));
+ cfg.setIncludeEventTypes(EVT_CACHE_REBALANCE_PART_SUPPLIED);
+
cfg.setFailureHandler(new TestFailureHandler());
}
else
@@ -132,8 +138,19 @@ public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstract
populateCache(ig, TEST_REBALANCE_CACHE, 3_000, 6_000);
+ // Breaks historical rebalance. The second node will try to switch to full rebalance.
canFailFirstNode.set(true);
+ // Break full rebalance.
+ IgnitePredicate<CacheRebalancingEvent> locLsnr = evt -> {
+ if (TEST_REBALANCE_CACHE.equals(evt.cacheName()))
+ throw new AssertionError(new IOException("Test crash"));
+
+ return true;
+ };
+
+ ig.events().localListen(locLsnr, EVT_CACHE_REBALANCE_PART_SUPPLIED);
+
startGrid(1);
WAIT_ON_SUPPLY_MESSAGE_FAILURE.await();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 02bbf6d..191be72 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -19,20 +19,28 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.file.OpenOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
@@ -41,31 +49,48 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.WalTestUtils;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
/**
@@ -79,11 +104,17 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
private static final int PARTS_CNT = 32;
/** Block message predicate to set to Communication SPI in node configuration. */
- private IgniteBiPredicate<ClusterNode, Message> blockMessagePredicate;
+ private IgniteBiPredicate<ClusterNode, Message> blockMsgPred;
+
+ /** Record message predicate to set to Communication SPI in node configuration. */
+ private IgniteBiPredicate<ClusterNode, Message> recordMsgPred;
/** */
private int backups;
+ /** User attributes. */
+ private Map<String, Serializable> userAttrs = new HashMap<>();
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); //to make all rebalance wal-based
@@ -116,11 +147,15 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
cfg.setCommunicationSpi(new WalRebalanceCheckingCommunicationSpi());
- if (blockMessagePredicate != null) {
- TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) cfg.getCommunicationSpi();
+ if (blockMsgPred != null)
+ ((TestRecordingCommunicationSpi) cfg.getCommunicationSpi()).blockMessages(blockMsgPred);
- spi.blockMessages(blockMessagePredicate);
- }
+ if (recordMsgPred != null)
+ ((TestRecordingCommunicationSpi) cfg.getCommunicationSpi()).record(recordMsgPred);
+
+ cfg.setFailureHandler(new StopNodeFailureHandler());
+ cfg.setConsistentId(gridName);
+ cfg.setUserAttributes(userAttrs);
return cfg;
}
@@ -344,7 +379,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
backups = 4;
// Prepare some data.
- IgniteEx crd = (IgniteEx) startGrids(3);
+ IgniteEx crd = startGrids(3);
crd.cluster().active(true);
@@ -362,7 +397,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
stopAllGrids();
// Rewrite data with globally disabled WAL.
- crd = (IgniteEx) startGrids(2);
+ crd = startGrids(2);
crd.cluster().active(true);
@@ -426,7 +461,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
backups = 4;
// Prepare some data.
- IgniteEx crd = (IgniteEx) startGrids(3);
+ IgniteEx crd = startGrids(3);
crd.cluster().active(true);
@@ -444,7 +479,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
stopAllGrids();
// Rewrite data to trigger further rebalance.
- IgniteEx supplierNode = (IgniteEx) startGrid(0);
+ IgniteEx supplierNode = startGrid(0);
supplierNode.cluster().active(true);
@@ -455,12 +490,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
forceCheckpoint();
- final int groupId = supplierNode.cachex(CACHE_NAME).context().groupId();
+ final int grpId = supplierNode.cachex(CACHE_NAME).context().groupId();
// Delay rebalance process for specified group.
- blockMessagePredicate = (node, msg) -> {
+ blockMsgPred = (node, msg) -> {
if (msg instanceof GridDhtPartitionDemandMessage)
- return ((GridDhtPartitionDemandMessage) msg).groupId() == groupId;
+ return ((GridDhtPartitionDemandMessage) msg).groupId() == grpId;
return false;
};
@@ -478,11 +513,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
);
// Inject I/O factory which can throw exception during WAL read on supplier node.
- FailingIOFactory ioFactory = new FailingIOFactory(new RandomAccessFileIOFactory());
-
- ((FileWriteAheadLogManager) supplierNode.cachex(CACHE_NAME).context().shared().wal()).setFileIOFactory(ioFactory);
-
- ioFactory.throwExceptionOnWalRead();
+ FailingIOFactory ioFactory = injectFailingIOFactory(supplierNode);
// Resume rebalance process.
TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) demanderNode.configuration().getCommunicationSpi();
@@ -490,12 +521,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
spi.stopBlock();
// Wait till rebalance will be failed and cancelled.
- Boolean result = preloader.rebalanceFuture().get();
+ Boolean res = preloader.rebalanceFuture().get();
- Assert.assertEquals("Rebalance should be cancelled on demander node: " + preloader.rebalanceFuture(), false, result);
+ Assert.assertEquals("Rebalance should be cancelled on demander node: " + preloader.rebalanceFuture(), false, res);
// Stop blocking messages and fail WAL during read.
- blockMessagePredicate = null;
+ blockMsgPred = null;
ioFactory.reset();
@@ -514,6 +545,445 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
}
/**
+ * Tests that demander switches to full rebalance if the previously chosen two of three of suppliers
+ * for a group have failed to perform historical rebalance due to an unexpected error.
+ *
+ * @throws Exception If failed
+ */
+ @Test
+ @WithSystemProperty(key = "IGNITE_DISABLE_WAL_DURING_REBALANCING", value = "true")
+ public void testMultipleNodesFailHistoricalRebalance() throws Exception {
+ backups = 1;
+ int node_cnt = 4;
+ int demanderId = node_cnt - 1;
+
+ // Start a new cluster with 3 suppliers.
+ startGrids(node_cnt - 1);
+
+ // Start demander node.
+ userAttrs.put("TEST_ATTR", "TEST_ATTR");
+ startGrid(node_cnt - 1);
+
+ grid(0).cluster().active(true);
+
+ // Create a new cache that places a full set of partitions on demander node.
+ RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, PARTS_CNT);
+ aff.setAffinityBackupFilter(new ClusterNodeAttributeAffinityBackupFilter("TEST_ATTR"));
+
+ String cacheName = "test-cache-1";
+ IgniteCache<Integer, IndexedObject> cache0 = grid(0).getOrCreateCache(
+ new CacheConfiguration<Integer, IndexedObject>(cacheName)
+ .setBackups(backups)
+ .setAffinity(aff)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC));
+
+ // Fill initial data and force checkpoint.
+ final int entryCnt = PARTS_CNT * 200;
+ for (int k = 0; k < entryCnt; k++)
+ cache0.put(k, new IndexedObject(k));
+
+ forceCheckpoint();
+
+ // Stop demander node.
+ stopGrid(demanderId);
+
+ // Rewrite data to trigger further rebalance.
+ for (int k = 0; k < entryCnt; k++) {
+ // Should skip one random partition to be sure that after restarting demander node,
+ // it will have at least one partition in OWNING state, and so WAL will not be disabled while rebalancing.
+ // This fact allows moving partitions to OWNING state during rebalancing
+ // even though the corresponding RebalanceFuture will be cancelled.
+ if (grid(0).affinity(cacheName).partition(k) != 12)
+ cache0.put(k, new IndexedObject(k));
+ }
+
+ forceCheckpoint();
+
+ // Delay rebalance process for specified group.
+ blockMsgPred = (node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+ return msg0.groupId() == CU.cacheId(cacheName);
+ }
+
+ return false;
+ };
+
+ Queue<RecordedDemandMessage> recorderedMsgs = new ConcurrentLinkedQueue<>();
+
+ // Record demand messages for specified group.
+ recordMsgPred = (node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+ if (msg0.groupId() == CU.cacheId(cacheName)) {
+ recorderedMsgs.add(new RecordedDemandMessage(
+ node.id(),
+ msg0.groupId(),
+ msg0.partitions().hasFull(),
+ msg0.partitions().hasHistorical()));
+ }
+ }
+
+ return false;
+ };
+
+ // Corrupt WAL on suppliers, except the one.
+ injectFailingIOFactory(grid(0));
+ injectFailingIOFactory(grid(1));
+
+ // Trigger rebalance process from suppliers.
+ IgniteEx restartedDemander = startGrid(node_cnt - 1);
+
+ TestRecordingCommunicationSpi demanderSpi = TestRecordingCommunicationSpi.spi(restartedDemander);
+
+ // Wait until demander starts historical rebalancning.
+ demanderSpi.waitForBlocked();
+
+ final IgniteInternalFuture<Boolean> preloadFut = restartedDemander.cachex(cacheName).context().group()
+ .preloader().rebalanceFuture();
+
+ // Unblock messages and start tracking demand and supply messages.
+ demanderSpi.stopBlock();
+
+ // Wait until rebalancing will be cancelled for both suppliers.
+ assertTrue(
+ "Rebalance future was not cancelled [fut=" + preloadFut + ']',
+ GridTestUtils.waitForCondition(preloadFut::isDone, getTestTimeout()));
+
+ Assert.assertEquals(
+ "Rebalance should be cancelled on demander node: " + preloadFut,
+ false,
+ preloadFut.get());
+
+ awaitPartitionMapExchange(true, true, null);
+
+ // Check data consistency.
+ assertPartitionsSame(idleVerify(restartedDemander, cacheName));
+
+ // Check that historical rebalance switched to full for supplier 1 & 2 and it was historical for supplier3.
+ IgnitePredicate<RecordedDemandMessage> histPred = msg ->
+ msg.hasHistorical() && !msg.hasFull();
+
+ IgnitePredicate<RecordedDemandMessage> fullPred = msg ->
+ !msg.hasHistorical() && msg.hasFull();
+
+ IgniteInClosure<UUID> supplierChecker = supplierId -> {
+ List<RecordedDemandMessage> demandMsgsForSupplier = recorderedMsgs.stream()
+ // Filter messages correspond to the supplierId
+ .filter(msg -> msg.supplierId().equals(supplierId))
+ .filter(msg -> msg.groupId() == CU.cacheId(cacheName))
+ // Filter out intermediate messages
+ .filter(msg -> msg.hasFull() || msg.hasHistorical())
+ .collect(toList());
+
+ assertEquals("There should only two demand messages [supplierId=" + supplierId + ']',
+ 2,
+ demandMsgsForSupplier.size());
+ assertTrue(
+ "The first message should require historical rebalance [msg=" + demandMsgsForSupplier.get(0) + ']',
+ histPred.apply(demandMsgsForSupplier.get(0)));
+ assertTrue(
+ "The second message should require full rebalance [msg=" + demandMsgsForSupplier.get(0) + ']',
+ fullPred.apply(demandMsgsForSupplier.get(1)));
+ };
+
+ supplierChecker.apply(grid(0).cluster().localNode().id());
+ supplierChecker.apply(grid(1).cluster().localNode().id());
+
+ // Check supplier3
+ List<RecordedDemandMessage> demandMsgsForSupplier = recorderedMsgs.stream()
+ // Filter messages correspond to the supplier3
+ .filter(msg -> msg.supplierId().equals(grid(2).cluster().localNode().id()))
+ .filter(msg -> msg.groupId() == CU.cacheId(cacheName))
+ // Filter out intermediate messages
+ .filter(msg -> msg.hasFull() || msg.hasHistorical())
+ .collect(toList());
+
+ assertEquals("There should only one demand message.", 1, demandMsgsForSupplier.size());
+ assertTrue(
+ "The first message should require historical rebalance [msg=" + demandMsgsForSupplier.get(0) + ']',
+ histPred.apply(demandMsgsForSupplier.get(0)));
+ }
+
+
+ /**
+ * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed
+ * to perform historical rebalance due to an unexpected error while historical iterator (wal iterator) is created.
+ * Additionally, the client node joins the cluster between the demand message sent, and the supply message received.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSwitchHistoricalRebalanceToFullAndClientJoin() throws Exception {
+ testSwitchHistoricalRebalanceToFull(IgniteWalRebalanceTest::injectFailingIOFactory, true);
+ }
+
+ /**
+ * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed
+ * to perform historical rebalance due to an unexpected error while historical iterator (wal iterator) is created.
+ *
+ * @throws Exception If failed
+ */
+ @Test
+ public void testSwitchHistoricalRebalanceToFullDueToFailOnCreatingWalIterator() throws Exception {
+ testSwitchHistoricalRebalanceToFull(IgniteWalRebalanceTest::injectFailingIOFactory, false);
+ }
+
+ /**
+ * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed
+ * to perform historical rebalance due to an unexpected error while iterating over reserved wal.
+ *
+ * @throws Exception If failed
+ */
+ @Test
+ public void testSwitchHistoricalRebalanceToFullWhileIteratingOverWAL() throws Exception {
+ testSwitchHistoricalRebalanceToFull(supplier1 -> {
+ try {
+ // Corrupt wal record in order to fail historical rebalance from supplier1 node.
+ IgniteWriteAheadLogManager walMgr = supplier1.context().cache().context().wal();
+
+ FileWALPointer ptr = (FileWALPointer)walMgr.log(new DataRecord(new DataEntry(
+ CU.cacheId("test-cache-1"),
+ new KeyCacheObjectImpl(0, null, 0),
+ null,
+ GridCacheOperation.DELETE,
+ new GridCacheVersion(0, 1, 1, 0),
+ new GridCacheVersion(0, 1, 1, 0),
+ 0,
+ 0,
+ 0
+ )));
+
+ File walDir = U.field(walMgr, "walWorkDir");
+
+ List<FileDescriptor> walFiles = new IgniteWalIteratorFactory().resolveWalFiles(
+ new IgniteWalIteratorFactory.IteratorParametersBuilder().filesOrDirs(walDir));
+
+ FileDescriptor lastWalFile = walFiles.get(walFiles.size() - 1);
+
+ WalTestUtils.corruptWalSegmentFile(lastWalFile, ptr);
+
+ IgniteCache<Integer, IndexedObject> c1 = supplier1.cache("test-cache-1");
+ for (int i = 0; i < PARTS_CNT * 100; i++)
+ c1.put(i, new IndexedObject(i));
+ }
+ catch (IgniteCheckedException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, false);
+ }
+
+ /**
+ * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed
+ * to perform historical rebalance due to an unexpected error.
+ *
+ * @param corruptWalClo Closure that corrupts wal iterating on supplier node.
+ * @param needClientStart {@code true} if client node should join the cluster between
+ * the demand message sent and the supply message received.
+ * @throws Exception If failed
+ */
+ public void testSwitchHistoricalRebalanceToFull(
+ IgniteInClosure<IgniteEx> corruptWalClo,
+ boolean needClientStart
+ ) throws Exception {
+ backups = 3;
+
+ IgniteEx supplier1 = startGrid(0);
+ IgniteEx supplier2 = startGrid(1);
+ IgniteEx demander = startGrid(2);
+
+ supplier1.cluster().active(true);
+
+ String supplier1Name = supplier1.localNode().consistentId().toString();
+ String supplier2Name = supplier2.localNode().consistentId().toString();
+ String demanderName = demander.localNode().consistentId().toString();
+
+ String cacheName1 = "test-cache-1";
+ String cacheName2 = "test-cache-2";
+
+ // Cache resides on supplier1 and demander nodes.
+ IgniteCache<Integer, IndexedObject> c1 = supplier1.getOrCreateCache(
+ new CacheConfiguration<Integer, IndexedObject>(cacheName1)
+ .setBackups(backups)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setRebalanceOrder(10)
+ .setNodeFilter(n -> n.consistentId().equals(supplier1Name) || n.consistentId().equals(demanderName)));
+
+ // Cache resides on supplier2 and demander nodes.
+ IgniteCache<Integer, IndexedObject> c2 = supplier1.getOrCreateCache(
+ new CacheConfiguration<Integer, IndexedObject>("test-cache-2")
+ .setBackups(backups)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setRebalanceOrder(20)
+ .setNodeFilter(n -> n.consistentId().equals(supplier2Name) || n.consistentId().equals(demanderName)));
+
+ // Fill initial data.
+ final int entryCnt = PARTS_CNT * 200;
+ for (int k = 0; k < entryCnt; k++) {
+ c1.put(k, new IndexedObject(k));
+
+ c2.put(k, new IndexedObject(k));
+ }
+
+ forceCheckpoint();
+
+ stopGrid(2);
+
+ // Rewrite data to trigger further rebalance.
+ for (int i = 0; i < entryCnt; i++) {
+ c1.put(i, new IndexedObject(i));
+
+ c2.put(i, new IndexedObject(i));
+ }
+
+ // Delay rebalance process for specified groups.
+ blockMsgPred = (node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+ return msg0.groupId() == CU.cacheId(cacheName1) || msg0.groupId() == CU.cacheId(cacheName2);
+ }
+
+ return false;
+ };
+
+ Queue<RecordedDemandMessage> recorderedMsgs = new ConcurrentLinkedQueue<>();
+
+ // Record demand messages for specified groups.
+ recordMsgPred = (node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+ if (msg0.groupId() == CU.cacheId(cacheName1) || msg0.groupId() == CU.cacheId(cacheName2)) {
+ recorderedMsgs.add(new RecordedDemandMessage(
+ node.id(),
+ msg0.groupId(),
+ msg0.partitions().hasFull(),
+ msg0.partitions().hasHistorical()));
+ }
+ }
+
+ return false;
+ };
+
+ // Delay rebalance process for specified group from supplier2.
+ TestRecordingCommunicationSpi supplierSpi2 = TestRecordingCommunicationSpi.spi(supplier2);
+ supplierSpi2.blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionSupplyMessage) {
+ GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage)msg;
+
+ return node.consistentId().equals(demanderName) && msg0.groupId() == CU.cacheId(cacheName2);
+ }
+
+ return false;
+ });
+
+ // Corrupt WAL on supplier1
+ corruptWalClo.apply(supplier1);
+
+ // Trigger rebalance process from suppliers.
+ IgniteEx restartedDemander = startGrid(2);
+
+ recordMsgPred = null;
+ blockMsgPred = null;
+
+ TestRecordingCommunicationSpi demanderSpi = TestRecordingCommunicationSpi.spi(grid(2));
+
+ // Wait until demander starts historical rebalancning.
+ demanderSpi.waitForBlocked();
+
+ final IgniteInternalFuture<Boolean> preloadFut1 = restartedDemander.cachex(cacheName1).context().group()
+ .preloader().rebalanceFuture();
+ final IgniteInternalFuture<Boolean> preloadFut2 = restartedDemander.cachex(cacheName2).context().group()
+ .preloader().rebalanceFuture();
+
+ if (needClientStart)
+ startClientGrid(3);
+
+ // Unblock messages and start tracking demand and supply messages.
+ demanderSpi.stopBlock();
+
+ // Wait until rebalancing will be cancelled for both suppliers.
+ GridTestUtils.waitForCondition(() -> preloadFut1.isDone() && preloadFut2.isDone(), getTestTimeout());
+
+ Assert.assertEquals(
+ "Rebalance should be cancelled on demander node: " + preloadFut1,
+ false,
+ preloadFut1.get());
+ Assert.assertEquals(
+ "Rebalance should be cancelled on demander node: " + preloadFut2,
+ false,
+ preloadFut2.get());
+
+ // Unblock supply messages from supplier2
+ supplierSpi2.stopBlock();
+
+ awaitPartitionMapExchange(true, true, null);
+
+ // Check data consistency.
+ assertPartitionsSame(idleVerify(restartedDemander, cacheName2, cacheName1));
+
+ // Check that historical rebalance switched to full for supplier1 and it is still historical for supplier2.
+ IgnitePredicate<RecordedDemandMessage> histPred = (msg) ->
+ msg.hasHistorical() && !msg.hasFull();
+
+ IgnitePredicate<RecordedDemandMessage> fullPred = (msg) ->
+ !msg.hasHistorical() && msg.hasFull();
+
+ // Supplier1
+ List<RecordedDemandMessage> demandMsgsForSupplier1 = recorderedMsgs.stream()
+ // Filter messages correspond to the supplier1
+ .filter(msg -> msg.groupId() == CU.cacheId(cacheName1))
+ // Filter out intermediate messages
+ .filter(msg -> msg.hasFull() || msg.hasHistorical())
+ .collect(toList());
+
+ assertEquals("There should only two demand messages.", 2, demandMsgsForSupplier1.size());
+ assertTrue(
+ "The first message should require historical rebalance [msg=" + demandMsgsForSupplier1.get(0) + ']',
+ histPred.apply(demandMsgsForSupplier1.get(0)));
+ assertTrue(
+ "The second message should require full rebalance [msg=" + demandMsgsForSupplier1.get(0) + ']',
+ fullPred.apply(demandMsgsForSupplier1.get(1)));
+
+ // Supplier2
+ List<RecordedDemandMessage> demandMsgsForSupplier2 = recorderedMsgs.stream()
+ // Filter messages correspond to the supplier2
+ .filter(msg -> msg.groupId() == CU.cacheId(cacheName2))
+ // Filter out intermediate messages
+ .filter(msg -> msg.hasFull() || msg.hasHistorical())
+ .collect(toList());
+
+ assertEquals("There should only two demand messages.", 2, demandMsgsForSupplier2.size());
+ assertTrue(
+ "Both messages should require historical rebalance [" +
+ "msg=" + demandMsgsForSupplier2.get(0) + ", msg=" + demandMsgsForSupplier2.get(1) + ']',
+ histPred.apply(demandMsgsForSupplier2.get(0)) && histPred.apply(demandMsgsForSupplier2.get(1)));
+ }
+
+ /**
+ * Injects a new instance of FailingIOFactory into wal manager for the given supplier node.
+ * This allows to break historical rebalance fo=rom the supplier.
+ *
+ * @param supplier Supplier node to be modified.
+ * @return Instance of FailingIOFactory that was injected.
+ */
+ private static FailingIOFactory injectFailingIOFactory(IgniteEx supplier) {
+ // Inject I/O factory which can throw exception during WAL read on supplier1 node.
+ FailingIOFactory ioFactory = new FailingIOFactory(new RandomAccessFileIOFactory());
+
+ ((FileWriteAheadLogManager)supplier.context().cache().context().wal()).setFileIOFactory(ioFactory);
+
+ ioFactory.throwExceptionOnWalRead();
+
+ return ioFactory;
+ }
+
+ /**
*
*/
private static class IndexedObject {
@@ -641,7 +1111,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
for (int i = entryCnt / 2; i < entryCnt; i++)
cache0.put(i, String.valueOf(i));
- blockMessagePredicate = (node, msg) -> {
+ blockMsgPred = (node, msg) -> {
if (msg instanceof GridDhtPartitionDemandMessage) {
GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
@@ -664,7 +1134,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
// Wait until the full rebalance begins with g1 as a supplier.
spi2.waitForBlocked(2);
- blockMessagePredicate = null;
+ blockMsgPred = null;
startGrid(0); // Should not force rebalancing remap.
@@ -725,4 +1195,71 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
failRead = false;
}
}
+
+ /** */
+ static class RecordedDemandMessage {
+ /** Full rebalance. */
+ private final boolean full;
+
+ /** Historical rebalance. */
+ private final boolean historical;
+
+ /** Supplier node id. */
+ private final UUID supplierId;
+
+ /** Group id. */
+ private final int grpId;
+
+ /**
+ * Creates a new instance.
+ * @param supplierId Supplier node id.
+ * @param grpId Cache group id.
+ * @param full {@code true} if demand message has partitions that should be fully rebalanced.
+ * @param historical {@code true} if demand message has partitions that should be wal rebalanced.
+ */
+ RecordedDemandMessage(UUID supplierId, int grpId, boolean full, boolean historical) {
+ this.supplierId = supplierId;
+ this.grpId = grpId;
+ this.full = full;
+ this.historical = historical;
+ }
+
+ /**
+ * @return Supplier node id.
+ */
+ UUID supplierId() {
+ return supplierId;
+ }
+
+ /**
+ * @return cache group id.
+ */
+ int groupId() {
+ return grpId;
+ }
+
+ /**
+ * @return {@code true} if demand message has partitions that should be fully rebalanced.
+ */
+ boolean hasFull() {
+ return full;
+ }
+
+ /**
+ * @return {@code true} if demand message has partitions that should be wal rebalanced.
+ */
+ boolean hasHistorical() {
+ return historical;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "RecordedDemandMessage{" +
+ "supplierId=" + supplierId +
+ ", groupId=" + grpId +
+ ", full=" + full +
+ ", historical=" + historical +
+ '}';
+ }
+ }
}