You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/02 12:58:58 UTC
[44/46] ignite git commit: 1093
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9abfc606
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9abfc606
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9abfc606
Branch: refs/heads/ignite-1093-2
Commit: 9abfc60694ddb2cc4dc11cd41124c79c09d083dd
Parents: e1651dd
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Sep 28 14:39:52 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Sep 28 14:39:52 2015 +0300
----------------------------------------------------------------------
.../GridDhtPartitionDemandMessage.java | 7 ++
.../dht/preloader/GridDhtPartitionDemander.java | 68 ++++++++++++--------
.../dht/preloader/GridDhtPartitionSupplier.java | 7 +-
3 files changed, 53 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9abfc606/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 06ac54b..6c60930 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -123,6 +123,13 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
}
/**
+ * @param updateSeq Update sequence.
+ */
+ void updateSequence(long updateSeq) {
+ this.updateSeq = updateSeq;
+ }
+
+ /**
* @return Update sequence.
*/
long updateSequence() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9abfc606/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 345e3bd..5d4db40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -64,7 +64,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -110,6 +110,9 @@ public class GridDhtPartitionDemander {
/** Demand lock. */
private final ReadWriteLock demandLock;
+ /** Rebalancing iteration counter. */
+ private long updateSeq = 0;
+
/**
* @param cctx Cctx.
* @param demandLock Demand lock.
@@ -194,7 +197,10 @@ public class GridDhtPartitionDemander {
* @return {@code True} if topology changed.
*/
private boolean topologyChanged(SyncFuture fut) {
- return !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || fut != syncFut;
+ return
+ !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
+ fut != syncFut || // Same topology, but dummy exchange forced because of missing partitions.
+ cctx.shared().exchange().hasPendingExchange(); // New topology pending.
}
/**
@@ -256,20 +262,20 @@ public class GridDhtPartitionDemander {
final SyncFuture oldFut = syncFut;
- if (!oldFut.isDummy() && assigns.topologyVersion().compareTo(oldFut.topologyVersion()) < 0) {
- U.log(log, "Skipping obsolete (dummy) exchange. [top=" + assigns.topologyVersion() + "]");
+ if (cctx.shared().exchange().hasPendingExchange()) { // Will rebalance at actual topology.
+ U.log(log, "Skipping obsolete exchange. [top=" + assigns.topologyVersion() + "]");
return;
}
- final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy());
+ final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy(), ++updateSeq);
if (!oldFut.isDummy())
oldFut.cancel();
else
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> future) {
- oldFut.onDone();
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> future) {
+ oldFut.onDone(fut.result());
}
});
@@ -385,13 +391,13 @@ public class GridDhtPartitionDemander {
final CacheConfiguration cfg = cctx.config();
- U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
- ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
- ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + "]");
-
//Check remote node rebalancing API version.
if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
- fut.appendPartitions(node.id(), d.partitions(), d.updateSequence());
+ U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+ ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+ ", topology=" + d.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
+
+ fut.appendPartitions(node.id(), d.partitions());
int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
@@ -415,6 +421,7 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
+ initD.updateSequence(fut.updateSeq);
try {
if (!topologyChanged(fut)) {
@@ -442,9 +449,13 @@ public class GridDhtPartitionDemander {
}
}
else {
+ U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+ ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+ ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + "]");
+
DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
- fut.appendPartitions(node.id(), d.partitions(), d.updateSequence());
+ fut.appendPartitions(node.id(), d.partitions());
dw.run(node, d);
}
@@ -516,7 +527,9 @@ public class GridDhtPartitionDemander {
assert node != null;
- if (!fut.topologyVersion().equals(topVer) || topologyChanged(fut) || !fut.isActual(id, supply.updateSequence()))
+ if (!fut.topologyVersion().equals(topVer) || // Current future based on another topology.
+ topologyChanged(fut) || // Topology already changed (for current future) or new topology pending.
+ !fut.isActual(supply.updateSequence())) // Current future have same topology, but another update sequence.
return;
if (log.isDebugEnabled())
@@ -761,7 +774,7 @@ public class GridDhtPartitionDemander {
private final IgniteLogger log;
/** Remaining. T3: startTime, partitions, updateSequence */
- private final Map<UUID, T3<Long, Collection<Integer>, Long>> remaining = new HashMap<>();
+ private final Map<UUID, T2<Long, Collection<Integer>>> remaining = new HashMap<>();
/** Missed. */
private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
@@ -776,6 +789,9 @@ public class GridDhtPartitionDemander {
/** Topology version. */
private final AffinityTopologyVersion topVer;
+ /** Unique (per demander) sequence id. */
+ private final long updateSeq;
+
/**
* @param assigns Assigns.
* @param cctx Context.
@@ -785,7 +801,8 @@ public class GridDhtPartitionDemander {
SyncFuture(GridDhtPreloaderAssignments assigns,
GridCacheContext<?, ?> cctx,
IgniteLogger log,
- boolean sentStopEvnt) {
+ boolean sentStopEvnt,
+ long updateSeq) {
assert assigns != null;
this.exchFut = assigns.exchangeFuture();
@@ -793,6 +810,7 @@ public class GridDhtPartitionDemander {
this.cctx = cctx;
this.log = log;
this.sendStoppedEvnt = sentStopEvnt;
+ this.updateSeq = updateSeq;
if (assigns != null)
cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
@@ -812,6 +830,7 @@ public class GridDhtPartitionDemander {
this.cctx = null;
this.log = null;
this.sendStoppedEvnt = false;
+ this.updateSeq = -1;
}
/**
@@ -822,14 +841,11 @@ public class GridDhtPartitionDemander {
}
/**
- * @param nodeId Node id.
* @param updateSeq Update sequence.
* @return true in case future created for specified updateSeq, false in other case.
*/
- private boolean isActual(UUID nodeId, long updateSeq) {
- T3<Long, Collection<Integer>, Long> t = remaining.get(nodeId);
-
- return t != null ? t.get3().equals(updateSeq) : false;
+ private boolean isActual(long updateSeq) {
+ return this.updateSeq == updateSeq;
}
/**
@@ -843,11 +859,11 @@ public class GridDhtPartitionDemander {
* @param nodeId Node id.
* @param parts Parts.
*/
- private void appendPartitions(UUID nodeId, Collection<Integer> parts, long updateSeq) {
+ private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
lock.lock();
try {
- remaining.put(nodeId, new T3<>(U.currentTimeMillis(), parts, updateSeq));
+ remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
}
finally {
lock.unlock();
@@ -1027,10 +1043,10 @@ public class GridDhtPartitionDemander {
if (!m.isEmpty()) {
U.log(log, ("Reassigning partitions that were missed: " + m));
- cctx.shared().exchange().forceDummyExchange(true, exchFut);
-
onDone(false); //Finished but has missed partitions and forced dummy exchange
+ cctx.shared().exchange().forceDummyExchange(true, exchFut);
+
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9abfc606/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index e23a50b..81e2fa4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -105,7 +105,7 @@ class GridDhtPartitionSupplier {
SupplyContext sc = entry.getValue();
- if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null)
+ if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null)
clearContext(scMap, t, sc, log);
}
}
@@ -128,7 +128,7 @@ class GridDhtPartitionSupplier {
}
};
- cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
+ cctx.events().addListener(lsnr, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED);
startOldListeners();
}
@@ -532,7 +532,8 @@ class GridDhtPartitionSupplier {
if (log.isDebugEnabled())
log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
- if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+ if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()) || // Topology already changed.
+ cctx.shared().exchange().hasPendingExchange()) // New topology pending.
return true;
cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());