You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/11 18:32:07 UTC
[1/8] incubator-ignite git commit: ignite-1093 Non stop rebalancing
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1093 c92cd899a -> 50e188df2
ignite-1093 Non stop rebalancing
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4776feca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4776feca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4776feca
Branch: refs/heads/ignite-1093
Commit: 4776fecaf059d11d4a4f3ff57634ebad9e41f451
Parents: c92cd89
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Fri Aug 7 18:45:07 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Fri Aug 7 18:45:07 2015 +0300
----------------------------------------------------------------------
.../preloader/GridDhtPartitionDemandPool.java | 176 +++++++++----------
.../preloader/GridDhtPartitionSupplyPool.java | 80 ++++++---
.../GridCacheMassiveRebalancingSelfTest.java | 10 +-
3 files changed, 144 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 0e0bc01..11645e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -225,6 +225,14 @@ public class GridDhtPartitionDemandPool {
}
/**
+ * @param idx
+ * @return topic
+ */
+ static Object topic(int idx, int cacheId, UUID nodeId) {
+ return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: remove nodeId
+ }
+
+ /**
*
*/
private void leaveBusy() {
@@ -537,39 +545,58 @@ public class GridDhtPartitionDemandPool {
if (isCancelled() || topologyChanged())
return missed;
- for (int p : d.partitions()) {
- cctx.io().addOrderedHandler(topic(p, topVer.topologyVersion()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
- handleSupplyMessage(new SupplyMessage(nodeId, msg), node, topVer, top, remaining,
- exchFut, missed, d);
+ int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo = getRebalanceThreadPoolSize / assigns.count
+
+ List<Set<Integer>> sParts = new ArrayList<>(threadCnt);
+
+ int cnt = 0;
+
+ while (cnt < threadCnt) {
+ sParts.add(new HashSet<Integer>());
+
+ final int idx = cnt;
+
+ cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) {
+ handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
+ exchFut, missed, d, remaining);
}
});
+
+ cnt++;
}
- try {
- Iterator<Integer> it = remaining.keySet().iterator();
+ Iterator<Integer> it = d.partitions().iterator();
- final int maxC = cctx.config().getRebalanceThreadPoolSize();
+ cnt = 0;
- int sent = 0;
+ while (it.hasNext()) {
+ sParts.get(cnt % threadCnt).add(it.next());
- while (sent < maxC && it.hasNext()) {
- int p = it.next();
+ cnt++;
+ }
- boolean res = remaining.replace(p, false, true);
+ try {
+ cnt = 0;
- assert res;
+ while (cnt < threadCnt) {
// Create copy.
- GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
+ GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
- initD.topic(topic(p, topVer.topologyVersion()));
+ initD.topic(topic(cnt, cctx.cacheId(),node.id()));
- // Send initial demand message.
- cctx.io().sendOrderedMessage(node,
- GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
+ try {
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("D "+cnt + " initial Demand "+" "+cctx.localNode().id());
- sent++;
+ cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition demand message to local node", e);
+ }
+
+ cnt++;
}
do {
@@ -580,41 +607,41 @@ public class GridDhtPartitionDemandPool {
return missed;
}
finally {
- for (int p : d.partitions())
- cctx.io().removeOrderedHandler(topic(p, topVer.topologyVersion()));
+ cnt = 0;
+
+ while (cnt < threadCnt) {
+ cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), node.id()));
+
+ cnt++;
+ }
}
}
- /**
- * @param p
- * @param topVer
- * @return topic
- */
- private Object topic(int p, long topVer) {
- return TOPIC_CACHE.topic("DemandPool" + topVer, cctx.cacheId(), p);//Todo topVer as long
- }
+ boolean logg = false;
/**
* @param s Supply message.
* @param node Node.
* @param topVer Topology version.
* @param top Topology.
- * @param remaining Remaining.
* @param exchFut Exchange future.
* @param missed Missed.
* @param d initial DemandMessage.
*/
private void handleSupplyMessage(
+ int idx,
SupplyMessage s,
ClusterNode node,
AffinityTopologyVersion topVer,
GridDhtPartitionTopology top,
- ConcurrentHashMap8<Integer, Boolean> remaining,
GridDhtPartitionsExchangeFuture exchFut,
Set<Integer> missed,
- GridDhtPartitionDemandMessage d) {
+ GridDhtPartitionDemandMessage d,
+ ConcurrentHashMap8 remaining) {
+
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("D "+idx + " handled supply message "+ cctx.localNode().id());
- //Todo: check it still actual and remove
// Check that message was received from expected node.
if (!s.senderId().equals(node.id())) {
U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
@@ -639,10 +666,8 @@ public class GridDhtPartitionDemandPool {
return;
}
- assert supply.infos().entrySet().size() == 1;//Todo: remove after supply message refactoring
-
// Preload.
- for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {//todo:only one partition (supply refactoring)
+ for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
int p = e.getKey();
if (cctx.affinity().localNode(p, topVer)) {
@@ -685,12 +710,17 @@ public class GridDhtPartitionDemandPool {
}
}
- boolean last = supply.last().contains(p);//Todo: refactor as boolean "last"
+ boolean last = supply.last().contains(p);
// If message was last for this partition,
// then we take ownership.
if (last) {
- top.own(part);
+ top.own(part);//todo: close future?
+
+// if (logg && cctx.name().equals("cache"))
+// System.out.println("D "+idx + " last "+ p +" "+ cctx.localNode().id());
+
+ remaining.remove(p);
if (log.isDebugEnabled())
log.debug("Finished rebalancing partition: " + part);
@@ -698,29 +728,6 @@ public class GridDhtPartitionDemandPool {
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
exchFut.discoveryEvent());
-
- remaining.remove(p);
-
- demandNextPartition(node, remaining, d, topVer);
- }
- else {
- try {
- // Create copy.
- GridDhtPartitionDemandMessage nextD =
- new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
-
- nextD.topic(topic(p, topVer.topologyVersion()));
-
- // Send demand message.
- cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()),
- nextD, cctx.ioPolicy(), d.timeout());
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to receive partitions from node (rebalancing will not " +
- "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
-
- cancel();
- }
}
}
finally {
@@ -743,48 +750,35 @@ public class GridDhtPartitionDemandPool {
}
}
- for (Integer miss : s.supply().missed()) // Todo: miss as param, not collection
+ for (Integer miss : s.supply().missed())
remaining.remove(miss);
// Only request partitions based on latest topology version.
for (Integer miss : s.supply().missed())
if (cctx.affinity().localNode(miss, topVer))
missed.add(miss);
- }
- /**
- * @param node Node.
- * @param remaining Remaining.
- * @param d initial DemandMessage.
- * @param topVer Topology version.
- */
- private void demandNextPartition(
- final ClusterNode node,
- final ConcurrentHashMap8<Integer, Boolean> remaining,
- final GridDhtPartitionDemandMessage d,
- final AffinityTopologyVersion topVer
- ) {
- try {
- for (Integer p : remaining.keySet()) {
- if (remaining.replace(p, false, true)) {
- // Create copy.
- GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
+ if (!remaining.isEmpty()) {
+ try {
+ // Create copy.
+ GridDhtPartitionDemandMessage nextD =
+ new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
- nextD.topic(topic(p, topVer.topologyVersion()));
+ nextD.topic(topic(idx, cctx.cacheId(), node.id()));
- // Send demand message.
- cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()),
- nextD, cctx.ioPolicy(), d.timeout());
+ // Send demand message.
+ cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(idx, cctx.cacheId()),
+ nextD, cctx.ioPolicy(), d.timeout());
- break;
- }
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("D " + idx + " ack " + cctx.localNode().id());
}
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to receive partitions from node (rebalancing will not " +
- "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+ "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
- cancel();
+ cancel();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index f10837a..c1c9941 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -76,25 +76,32 @@ class GridDhtPartitionSupplyPool {
top = cctx.dht().topology();
+ int cnt = 0;
+
if (!cctx.kernalContext().clientNode()) {
- for (int p = 0; p <= cctx.affinity().partitions(); p++)
- cctx.io().addOrderedHandler(topic(p, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ while (cnt < cctx.config().getRebalanceThreadPoolSize()) {
+ final int idx = cnt;
+
+ cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
@Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- processMessage(m, id);
+ processMessage(m, id, idx);
}
});
+
+ cnt++;
+ }
}
depEnabled = cctx.gridDeploy().enabled();
}
/**
- * @param p Partition.
+ * @param idx Index.
* @param id Node id.
* @return topic
*/
- static Object topic(int p, int id) {
- return TOPIC_CACHE.topic("SupplyPool", id, p);
+ static Object topic(int idx, int id) {
+ return TOPIC_CACHE.topic("SupplyPool", idx, id);
}
/**
@@ -119,44 +126,65 @@ class GridDhtPartitionSupplyPool {
this.preloadPred = preloadPred;
}
+ boolean logg = false;
+
/**
* @param d Demand message.
* @param id Node uuid.
*/
- private void processMessage(GridDhtPartitionDemandMessage d, UUID id) {
+ private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) {
assert d != null;
assert id != null;
+ if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+ return;
+
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("S " + idx + " process message " + cctx.localNode().id());
+
GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
d.updateSequence(), cctx.cacheId());
long preloadThrottle = cctx.config().getRebalanceThrottle();
- long maxBatchesCnt = 3;//Todo: param
-
ClusterNode node = cctx.discovery().node(id);
- boolean ack = false;
-
T2<UUID, Object> scId = new T2<>(id, d.topic());
try {
SupplyContext sctx = scMap.remove(scId);
- if (doneMap.get(scId) != null)//Todo: refactor
+ if (!d.partitions().isEmpty()) {//Only first request contains partitions.
+ doneMap.remove(scId);
+ }
+
+ if (doneMap.get(scId) != null) {
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("S " + idx + " exit " + cctx.localNode().id());
+
return;
+ }
long bCnt = 0;
int phase = 0;
- if (sctx != null)
+ boolean newReq = true;
+
+ long maxBatchesCnt = 3;//Todo: param
+
+ if (sctx != null) {
phase = sctx.phase;
+ maxBatchesCnt = 1;
+ }
+
Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
- while (sctx != null || partIt.hasNext()) {
- int part = sctx != null ? sctx.part : partIt.next();
+ while ((sctx != null && newReq) || partIt.hasNext()) {
+ int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+ newReq = false;
GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
@@ -206,8 +234,6 @@ class GridDhtPartitionSupplyPool {
}
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
if (!reply(node, d, s))
return;
@@ -223,6 +249,9 @@ class GridDhtPartitionSupplyPool {
return;
}
else {
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id());
+
s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
cctx.cacheId());
}
@@ -275,8 +304,6 @@ class GridDhtPartitionSupplyPool {
}
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
if (!reply(node, d, s))
return;
@@ -382,8 +409,6 @@ class GridDhtPartitionSupplyPool {
}
if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
-
if (!reply(node, d, s))
return;
@@ -415,11 +440,12 @@ class GridDhtPartitionSupplyPool {
// Mark as last supply message.
s.last(part);
- if (ack) {
- s.markAck();
+// if (logg && cctx.name().equals("cache"))
+// System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id());
- break; // Partition for loop.
- }
+ phase = 0;
+
+ sctx = null;
}
finally {
loc.release();
@@ -442,13 +468,15 @@ class GridDhtPartitionSupplyPool {
/**
* @param n Node.
- * @param d Demand message.
* @param s Supply message.
* @return {@code True} if message was sent, {@code false} if recipient left grid.
* @throws IgniteCheckedException If failed.
*/
private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
throws IgniteCheckedException {
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("S sent "+ cctx.localNode().id());
+
try {
if (log.isDebugEnabled())
log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
index 11ea8f6..4992d19 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -34,7 +34,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
/** */
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
- private static int TEST_SIZE = 1_024_000;
+ private static int TEST_SIZE = 10_024_000;
/** cache name. */
protected static String CACHE_NAME_DHT = "cache";
@@ -58,7 +58,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
cacheCfg.setName(CACHE_NAME_DHT);
cacheCfg.setCacheMode(CacheMode.PARTITIONED);
- cacheCfg.setRebalanceBatchSize(100 * 1024);
+ //cacheCfg.setRebalanceBatchSize(1024);
cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
cacheCfg.setRebalanceThreadPoolSize(4);
//cacheCfg.setRebalanceTimeout(1000000);
@@ -107,7 +107,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
long start = System.currentTimeMillis();
- //startGrid(1);
+ startGrid(1);
startGrid(2);
@@ -115,9 +115,9 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
stopGrid(0);
- //Thread.sleep(20000);
+ Thread.sleep(20000);
- //stopGrid(1);
+ stopGrid(1);
checkData(grid(2));
[5/8] incubator-ignite git commit: ignite-1093
Posted by av...@apache.org.
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d0b7d9fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d0b7d9fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d0b7d9fc
Branch: refs/heads/ignite-1093
Commit: d0b7d9fca8713aefeb6e6477679efb9d7a8db9e0
Parents: 76ba5d9
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Tue Aug 11 18:09:00 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Tue Aug 11 18:09:00 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCachePreloader.java | 2 +-
.../cache/GridCachePreloaderAdapter.java | 2 +-
.../dht/preloader/GridDhtPartitionDemander.java | 941 +++++++------------
.../dht/preloader/GridDhtPartitionSupplier.java | 21 +-
.../dht/preloader/GridDhtPreloader.java | 2 +-
5 files changed, 328 insertions(+), 640 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index b8bb08e..1e915eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -91,7 +91,7 @@ public interface GridCachePreloader {
* @param assignments Assignments to add.
* @param forcePreload Force preload flag.
*/
- public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
+ public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException;
/**
* @param p Preload predicate.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 0adf510..68deb2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -142,7 +142,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
+ @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException {
// No-op.
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index fdd101e..e177dae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -27,30 +27,25 @@ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
import org.jsr166.*;
import java.util.*;
-import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
-import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.GridTopic.*;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
import static org.apache.ignite.internal.processors.dr.GridDrType.*;
/**
- * Thread pool for requesting partitions from other nodes
- * and populating local cache.
+ * Thread pool for requesting partitions from other nodes and populating local cache.
*/
@SuppressWarnings("NonConstantFieldWithUpperCaseName")
public class GridDhtPartitionDemander {
@@ -63,35 +58,25 @@ public class GridDhtPartitionDemander {
/** */
private final ReadWriteLock busyLock;
- /** */
- @GridToStringInclude
- private final Collection<DemandWorker> dmdWorkers;
-
/** Preload predicate. */
private IgnitePredicate<GridCacheEntryInfo> preloadPred;
/** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
@GridToStringInclude
- private SyncFuture syncFut;
-
- /** Preload timeout. */
- private final AtomicLong timeout;
-
- /** Allows demand threads to synchronize their step. */
- private CyclicBarrier barrier;
+ private volatile SyncFuture syncFut;
/** Demand lock. */
private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
- /** */
- private int poolSize;
-
/** Last timeout object. */
private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
/** Last exchange future. */
private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
+ /** Assignments. */
+ private volatile GridDhtPreloaderAssignments assigns;
+
/**
* @param cctx Cache context.
* @param busyLock Shutdown lock.
@@ -107,53 +92,47 @@ public class GridDhtPartitionDemander {
boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
- poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
if (enabled) {
- barrier = new CyclicBarrier(poolSize);
- dmdWorkers = new ArrayList<>(poolSize);
+ for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
+ final int idx = cnt;
- for (int i = 0; i < poolSize; i++)
- dmdWorkers.add(new DemandWorker(i));
+ cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessage m) {
+ enterBusy();
- syncFut = new SyncFuture(dmdWorkers);
+ try {
+ handleSupplyMessage(idx, id, m);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ }
}
- else {
- dmdWorkers = Collections.emptyList();
- syncFut = new SyncFuture(dmdWorkers);
+ syncFut = new SyncFuture();
+ if (!enabled)
// Calling onDone() immediately since preloading is disabled.
syncFut.onDone();
- }
-
- timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
}
/**
*
*/
void start() {
- if (poolSize > 0) {
- for (DemandWorker w : dmdWorkers)
- new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start();
- }
}
/**
*
*/
void stop() {
- U.cancel(dmdWorkers);
-
- if (log.isDebugEnabled())
- log.debug("Before joining on demand workers: " + dmdWorkers);
-
- U.join(dmdWorkers, log);
-
- if (log.isDebugEnabled())
- log.debug("After joining on demand workers: " + dmdWorkers);
+ if (cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode()) {
+ for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++)
+ cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId()));
+ }
lastExchangeFut = null;
@@ -177,13 +156,6 @@ public class GridDhtPartitionDemander {
}
/**
- * @return Size of this thread pool.
- */
- int poolSize() {
- return poolSize;
- }
-
- /**
* Force preload.
*/
void forcePreload() {
@@ -225,23 +197,22 @@ public class GridDhtPartitionDemander {
* @param idx
* @return topic
*/
- static Object topic(int idx, int cacheId, UUID nodeId) {
- return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: remove nodeId
+ static Object topic(int idx, int cacheId) {
+ return TOPIC_CACHE.topic("Demander", cacheId, idx);
}
/**
- *
+ * @return {@code True} if topology changed.
*/
- private void leaveBusy() {
- busyLock.readLock().unlock();
+ private boolean topologyChanged(AffinityTopologyVersion topVer) {
+ return !cctx.affinity().affinityTopologyVersion().equals(topVer);
}
/**
- * @param type Type.
- * @param discoEvt Discovery event.
+ *
*/
- private void preloadEvent(int type, DiscoveryEvent discoEvt) {
- preloadEvent(-1, type, discoEvt);
+ private void leaveBusy() {
+ busyLock.readLock().unlock();
}
/**
@@ -256,28 +227,6 @@ public class GridDhtPartitionDemander {
}
/**
- * @param deque Deque to poll from.
- * @param time Time to wait.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(time, MILLISECONDS);
- }
-
- /**
* @param p Partition.
* @param topVer Topology version.
* @return Picked owners.
@@ -316,7 +265,7 @@ public class GridDhtPartitionDemander {
* @param assigns Assignments.
* @param force {@code True} if dummy reassign.
*/
- void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
+ void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Adding partition assignments: " + assigns);
@@ -325,341 +274,194 @@ public class GridDhtPartitionDemander {
if (delay == 0 || force) {
assert assigns != null;
- synchronized (dmdWorkers) {
- for (DemandWorker w : dmdWorkers)
- w.addAssignments(assigns);
- }
- }
- else if (delay > 0) {
- assert !force;
+ AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
- GridTimeoutObject obj = lastTimeoutObj.get();
+ if (this.assigns != null) {
+ syncFut.get();
- if (obj != null)
- cctx.time().removeTimeoutObject(obj);
-
- final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
- assert exchFut != null : "Delaying rebalance process without topology event.";
-
- obj = new GridTimeoutObjectAdapter(delay) {
- @Override public void onTimeout() {
- exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
- cctx.shared().exchange().forcePreloadExchange(exchFut);
- }
- });
- }
- };
-
- lastTimeoutObj.set(obj);
+ syncFut = new SyncFuture();
+ }
- cctx.time().addTimeoutObject(obj);
- }
- }
+ if (assigns.isEmpty() || topologyChanged(topVer)) {
+ syncFut.onDone();
- /**
- *
- */
- void unwindUndeploys() {
- demandLock.writeLock().lock();
+ return;
+ }
- try {
- cctx.deploy().unwind(cctx);
- }
- finally {
- demandLock.writeLock().unlock();
- }
- }
+ this.assigns = assigns;
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtPartitionDemander.class, this);
- }
+ for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+ GridDhtPartitionDemandMessage d = e.getValue();
- /**
- *
- */
- private class DemandWorker extends GridWorker {
- /** Worker ID. */
- private int id;
+ d.timeout(cctx.config().getRebalanceTimeout());
+ d.workerId(0);//old api support.
- /** Partition-to-node assignments. */
- private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+ ClusterNode node = e.getKey();
- /** Hide worker logger and use cache logger instead. */
- private IgniteLogger log = GridDhtPartitionDemander.this.log;
+ GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
- /**
- * @param id Worker ID.
- */
- private DemandWorker(int id) {
- super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemander.this.log);
+ remainings.addAll(d.partitions());
- assert id >= 0;
+ syncFut.append(node.id(), remainings);
- this.id = id;
- }
+ int lsnrCnt = cctx.config().getRebalanceThreadPoolSize();
- /**
- * @param assigns Assignments.
- */
- void addAssignments(GridDhtPreloaderAssignments assigns) {
- assert assigns != null;
+ List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
- assignQ.offer(assigns);
+ for (int cnt = 0; cnt < lsnrCnt; cnt++)
+ sParts.add(new HashSet<Integer>());
- if (log.isDebugEnabled())
- log.debug("Added assignments to worker: " + this);
- }
+ Iterator<Integer> it = d.partitions().iterator();
- /**
- * @return {@code True} if topology changed.
- */
- private boolean topologyChanged() {
- return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged();
- }
+ int cnt = 0;
- /**
- * @param pick Node picked for preloading.
- * @param p Partition.
- * @param entry Preloaded entry.
- * @param topVer Topology version.
- * @return {@code False} if partition has become invalid during preloading.
- * @throws IgniteInterruptedCheckedException If interrupted.
- */
- private boolean preloadEntry(
- ClusterNode pick,
- int p,
- GridCacheEntryInfo entry,
- AffinityTopologyVersion topVer
- ) throws IgniteCheckedException {
- try {
- GridCacheEntryEx cached = null;
+ while (it.hasNext())
+ sParts.get(cnt++ % lsnrCnt).add(it.next());
- try {
- cached = cctx.dht().entryEx(entry.key());
+ for (cnt = 0; cnt < lsnrCnt; cnt++) {
- if (log.isDebugEnabled())
- log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
+ if (!sParts.get(cnt).isEmpty()) {
- if (cctx.dht().isIgfsDataCache() &&
- cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
- LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
- "value, will ignore rebalance entries): " + name());
+ // Create copy.
+ GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
- if (cached.markObsoleteIfEmpty(null))
- cached.context().cache().removeIfObsolete(cached.key());
+ initD.topic(topic(cnt, cctx.cacheId()));
- return true;
- }
-
- if (preloadPred == null || preloadPred.apply(entry)) {
- if (cached.initialValue(
- entry.value(),
- entry.version(),
- entry.ttl(),
- entry.expireTime(),
- true,
- topVer,
- cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
- )) {
- cctx.evicts().touch(cached, topVer); // Start tracking.
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
- cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
- (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
- false, null, null, null);
+ try {
+ cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to send partition demand message to local node", ex);
}
- else if (log.isDebugEnabled())
- log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
- ", part=" + p + ']');
}
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
}
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
- cached.key() + ", part=" + p + ']');
- }
- catch (GridDhtInvalidPartitionException ignored) {
- if (log.isDebugEnabled())
- log.debug("Partition became invalid during rebalancing (will ignore): " + p);
-
- return false;
- }
- }
- catch (IgniteInterruptedCheckedException e) {
- throw e;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
- cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
- }
-
- return true;
- }
-
- /**
- * @param node Node to demand from.
- * @param topVer Topology version.
- * @param d Demand message.
- * @param exchFut Exchange future.
- * @return Missed partitions.
- * @throws InterruptedException If interrupted.
- * @throws ClusterTopologyCheckedException If node left.
- * @throws IgniteCheckedException If failed to send message.
- */
- private Set<Integer> demandFromNode(
- final ClusterNode node,
- final AffinityTopologyVersion topVer,
- final GridDhtPartitionDemandMessage d,
- final GridDhtPartitionsExchangeFuture exchFut
- ) throws InterruptedException, IgniteCheckedException {
- final GridDhtPartitionTopology top = cctx.dht().topology();
- long timeout = GridDhtPartitionDemander.this.timeout.get();
+ if (log.isInfoEnabled() && !d.partitions().isEmpty()) {
+ LinkedList<Integer> s = new LinkedList<>(d.partitions());
- d.timeout(timeout);
- d.workerId(id);
+ Collections.sort(s);
- final Set<Integer> missed = new HashSet<>();
+ StringBuilder sb = new StringBuilder();
- final ConcurrentHashMap8<Integer, Boolean> remaining = new ConcurrentHashMap8<>();
+ int start = -1;
- for (int p : d.partitions())
- remaining.put(p, false);
+ int prev = -1;
- if (isCancelled() || topologyChanged())
- return missed;
+ Iterator<Integer> sit = s.iterator();
- int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo = getRebalanceThreadPoolSize / assigns.count
+ while (sit.hasNext()) {
+ int p = sit.next();
+ if (start == -1) {
+ start = p;
+ prev = p;
+ }
- List<Set<Integer>> sParts = new ArrayList<>(threadCnt);
+ if (prev < p - 1) {
+ sb.append(start);
- int cnt = 0;
+ if (start != prev)
+ sb.append("-").append(prev);
- while (cnt < threadCnt) {
- sParts.add(new HashSet<Integer>());
+ sb.append(", ");
- final int idx = cnt;
+ start = p;
+ }
- cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) {
- enterBusy();
+ if (!sit.hasNext()) {
+ sb.append(start);
- try {
- handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
- exchFut, missed, d, remaining);
- }finally{
- leaveBusy();
+ if (start != p)
+ sb.append("-").append(p);
}
+
+ prev = p;
}
- });
- cnt++;
+ log.info("Requested rebalancing [from node=" + node.id() + ", partitions=" + s.size() + " (" + sb.toString() + ")]");
+ }
}
+ }
+ else if (delay > 0) {
+ GridTimeoutObject obj = lastTimeoutObj.get();
- Iterator<Integer> it = d.partitions().iterator();
-
- cnt = 0;
+ if (obj != null)
+ cctx.time().removeTimeoutObject(obj);
- while (it.hasNext()) {
- sParts.get(cnt % threadCnt).add(it.next());
+ final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
- cnt++;
- }
+ assert exchFut != null : "Delaying rebalance process without topology event.";
- try {
- cnt = 0;
+ obj = new GridTimeoutObjectAdapter(delay) {
+ @Override public void onTimeout() {
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+ cctx.shared().exchange().forcePreloadExchange(exchFut);
+ }
+ });
+ }
+ };
- while (cnt < threadCnt) {
+ lastTimeoutObj.set(obj);
- // Create copy.
- GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+ cctx.time().addTimeoutObject(obj);
+ }
+ }
- initD.topic(topic(cnt, cctx.cacheId(),node.id()));
+ /**
+ *
+ */
+ void unwindUndeploys() {
+ demandLock.writeLock().lock();
- try {
- if (logg && cctx.name().equals("cache"))
- System.out.println("D "+cnt + " initial Demand "+" "+cctx.localNode().id());
+ try {
+ cctx.deploy().unwind(cctx);
+ }
+ finally {
+ demandLock.writeLock().unlock();
+ }
+ }
- cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send partition demand message to local node", e);
- }
+ /**
+ * @param idx Index.
+ * @param id Node id.
+ * @param supply Supply.
+ */
+ private void handleSupplyMessage(
+ int idx,
+ final UUID id,
+ final GridDhtPartitionSupplyMessage supply) {
+ ClusterNode node = cctx.node(id);
- cnt++;
- }
+ assert node != null;
- do {
- U.sleep(1000);//Todo: improve
- }
- while (!isCancelled() && !topologyChanged() && !remaining.isEmpty());
+ GridDhtPartitionDemandMessage d = assigns.get(node);
- return missed;
- }
- finally {
- cnt = 0;
+ AffinityTopologyVersion topVer = d.topologyVersion();
- while (cnt < threadCnt) {
- cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), node.id()));
+ if (topologyChanged(topVer)) {
+ syncFut.cancel(id);
- cnt++;
- }
- }
+ return;
}
- boolean logg = false;
-
- /**
- * @param s Supply message.
- * @param node Node.
- * @param topVer Topology version.
- * @param top Topology.
- * @param exchFut Exchange future.
- * @param missed Missed.
- * @param d initial DemandMessage.
- */
- private void handleSupplyMessage(
- int idx,
- SupplyMessage s,
- ClusterNode node,
- AffinityTopologyVersion topVer,
- GridDhtPartitionTopology top,
- GridDhtPartitionsExchangeFuture exchFut,
- Set<Integer> missed,
- GridDhtPartitionDemandMessage d,
- ConcurrentHashMap8 remaining) {
-
- if (logg && cctx.name().equals("cache"))
- System.out.println("D "+idx + " handled supply message "+ cctx.localNode().id());
-
- // Check that message was received from expected node.
- if (!s.senderId().equals(node.id())) {
- U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
- ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+ if (log.isDebugEnabled())
+ log.debug("Received supply message: " + supply);
- return;
- }
+ // Check whether there were class loading errors on unmarshal
+ if (supply.classError() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Class got undeployed during preloading: " + supply.classError());
- if (topologyChanged())
- return;
+ syncFut.cancel(id);
- if (log.isDebugEnabled())
- log.debug("Received supply message: " + s);
+ return;
+ }
- GridDhtPartitionSupplyMessage supply = s.supply();
+ final GridDhtPartitionTopology top = cctx.dht().topology();
- // Check whether there were class loading errors on unmarshal
- if (supply.classError() != null) {
- if (log.isDebugEnabled())
- log.debug("Class got undeployed during preloading: " + supply.classError());
+ GridDhtPartitionsExchangeFuture exchFut = assigns.exchangeFuture();
- return;
- }
+ try {
// Preload.
for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
@@ -689,19 +491,12 @@ public class GridDhtPartitionDemander {
continue;
}
- try {
- if (!preloadEntry(node, p, entry, topVer)) {
- if (log.isDebugEnabled())
- log.debug("Got entries for invalid partition during " +
- "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
-
- break;
- }
- }
- catch (IgniteCheckedException ex) {
- cancel();
+ if (!preloadEntry(node, p, entry, topVer)) {
+ if (log.isDebugEnabled())
+ log.debug("Got entries for invalid partition during " +
+ "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
- return;
+ break;
}
}
@@ -710,12 +505,9 @@ public class GridDhtPartitionDemander {
// If message was last for this partition,
// then we take ownership.
if (last) {
- top.own(part);//todo: close future?
-
-// if (logg && cctx.name().equals("cache"))
-// System.out.println("D "+idx + " last "+ p +" "+ cctx.localNode().id());
+ top.own(part);
- remaining.remove(p);
+ syncFut.onPartitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Finished rebalancing partition: " + part);
@@ -731,218 +523,139 @@ public class GridDhtPartitionDemander {
}
}
else {
- remaining.remove(p);
+ syncFut.onPartitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
}
}
else {
- remaining.remove(p);
+ syncFut.onPartitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
}
}
- for (Integer miss : s.supply().missed())
- remaining.remove(miss);
-
// Only request partitions based on latest topology version.
- for (Integer miss : s.supply().missed())
+ for (Integer miss : supply.missed())
if (cctx.affinity().localNode(miss, topVer))
- missed.add(miss);
+ syncFut.onMissedPartition(id, miss);
- if (!remaining.isEmpty()) {
- try {
- // Create copy.
- GridDhtPartitionDemandMessage nextD =
- new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+ for (Integer miss : supply.missed())
+ syncFut.onPartitionDone(id, miss);
- nextD.topic(topic(idx, cctx.cacheId(), node.id()));
+ if (!syncFut.isDone()) {
- // Send demand message.
- cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
- nextD, cctx.ioPolicy(), d.timeout());
+ // Create copy.
+ GridDhtPartitionDemandMessage nextD =
+ new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
- if (logg && cctx.name().equals("cache"))
- System.out.println("D " + idx + " ack " + cctx.localNode().id());
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to receive partitions from node (rebalancing will not " +
- "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
+ nextD.topic(topic(idx, cctx.cacheId()));
- cancel();
- }
+ // Send demand message.
+ cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
+ nextD, cctx.ioPolicy(), d.timeout());
}
}
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+ ", msg=" + e.getMessage() + ']');
+ syncFut.cancel(id);
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+ "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- try {
- int rebalanceOrder = cctx.config().getRebalanceOrder();
-
- if (!CU.isMarshallerCache(cctx.name())) {
- if (log.isDebugEnabled())
- log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
+ syncFut.cancel(id);
+ }
+ }
- try {
- cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
- "[cacheName=" + cctx.name() + ']');
+ /**
+ * @param pick Node picked for preloading.
+ * @param p Partition.
+ * @param entry Preloaded entry.
+ * @param topVer Topology version.
+ * @return {@code False} if partition has become invalid during preloading.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private boolean preloadEntry(
+ ClusterNode pick,
+ int p,
+ GridCacheEntryInfo entry,
+ AffinityTopologyVersion topVer
+ ) throws IgniteCheckedException {
+ try {
+ GridCacheEntryEx cached = null;
- return;
- }
- catch (IgniteCheckedException e) {
- throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
- }
- }
+ try {
+ cached = cctx.dht().entryEx(entry.key());
- if (rebalanceOrder > 0) {
- IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
- try {
- if (fut != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
- ", rebalanceOrder=" + rebalanceOrder + ']');
+ if (cctx.dht().isIgfsDataCache() &&
+ cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
+ LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
+ "value, will ignore rebalance entries)");
- fut.get();
- }
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
- "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+ if (cached.markObsoleteIfEmpty(null))
+ cached.context().cache().removeIfObsolete(cached.key());
- return;
- }
- catch (IgniteCheckedException e) {
- throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
- }
+ return true;
}
- GridDhtPartitionsExchangeFuture exchFut = null;
-
- boolean stopEvtFired = false;
-
- while (!isCancelled()) {
- try {
- barrier.await();
-
- if (id == 0 && exchFut != null && !exchFut.dummy() &&
- cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
-
- if (!cctx.isReplicated() || !stopEvtFired) {
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
- stopEvtFired = true;
- }
- }
- }
- catch (BrokenBarrierException ignore) {
- throw new InterruptedException("Demand worker stopped.");
- }
-
- // Sync up all demand threads at this step.
- GridDhtPreloaderAssignments assigns = null;
-
- while (assigns == null)
- assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
-
- demandLock.readLock().lock();
-
- try {
- exchFut = assigns.exchangeFuture();
-
- // Assignments are empty if preloading is disabled.
- if (assigns.isEmpty())
- continue;
-
- boolean resync = false;
-
- // While.
- // =====
- while (!isCancelled() && !topologyChanged() && !resync) {
- Collection<Integer> missed = new HashSet<>();
-
- // For.
- // ===
- for (ClusterNode node : assigns.keySet()) {
- if (topologyChanged() || isCancelled())
- break; // For.
-
- GridDhtPartitionDemandMessage d = assigns.remove(node);
-
- // If another thread is already processing this message,
- // move to the next node.
- if (d == null)
- continue; // For.
-
- try {
- Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut);
-
- if (!set.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
- set + ']');
-
- missed.addAll(set);
- }
- }
- catch (IgniteInterruptedCheckedException e) {
- throw e;
- }
- catch (ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
- ", msg=" + e.getMessage() + ']');
-
- resync = true;
-
- break; // For.
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to receive partitions from node (rebalancing will not " +
- "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
- }
- }
-
- // Processed missed entries.
- if (!missed.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Reassigning partitions that were missed: " + missed);
-
- assert exchFut.exchangeId() != null;
-
- cctx.shared().exchange().forceDummyExchange(true, exchFut);
- }
- else
- break; // While.
- }
- }
- finally {
- demandLock.readLock().unlock();
-
- syncFut.onWorkerDone(this);
+ if (preloadPred == null || preloadPred.apply(entry)) {
+ if (cached.initialValue(
+ entry.value(),
+ entry.version(),
+ entry.ttl(),
+ entry.expireTime(),
+ true,
+ topVer,
+ cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+ )) {
+ cctx.evicts().touch(cached, topVer); // Start tracking.
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+ cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
+ (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+ false, null, null, null);
}
-
- cctx.shared().exchange().scheduleResendPartitions();
+ else if (log.isDebugEnabled())
+ log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+ ", part=" + p + ']');
}
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
}
- finally {
- // Safety.
- syncFut.onWorkerDone(this);
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+ cached.key() + ", part=" + p + ']');
}
- }
+ catch (GridDhtInvalidPartitionException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Partition became invalid during rebalancing (will ignore): " + p);
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DemandWorker.class, this, "assignQ", assignQ, "super", super.toString());
+ return false;
+ }
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw e;
}
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
+ cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtPartitionDemander.class, this);
}
/**
@@ -1035,88 +748,80 @@ public class GridDhtPartitionDemander {
return assigns;
}
- /**
- *
- */
- private class SyncFuture extends GridFutureAdapter<Object> {
- /** */
- private static final long serialVersionUID = 0L;
+/**
+ *
+ */
+private class SyncFuture extends GridFutureAdapter<Object> {
+ /** */
+ private static final long serialVersionUID = 1L;
- /** Remaining workers. */
- private Collection<DemandWorker> remaining;
+ private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
- /**
- * @param workers List of workers.
- */
- private SyncFuture(Collection<DemandWorker> workers) {
- assert workers.size() == poolSize();
+ private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
- remaining = Collections.synchronizedList(new LinkedList<>(workers));
- }
+ public void append(UUID nodeId, Collection<Integer> parts) {
+ remaining.put(nodeId, parts);
- /**
- * @param w Worker who iterated through all partitions.
- */
- void onWorkerDone(DemandWorker w) {
- if (isDone())
- return;
+ missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+ }
- if (remaining.remove(w))
- if (log.isDebugEnabled())
- log.debug("Completed full partition iteration for worker [worker=" + w + ']');
+ void cancel(UUID nodeId) {
+ if (isDone())
+ return;
- if (remaining.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Completed sync future.");
+ remaining.remove(nodeId);
- onDone();
- }
- }
+ checkIsDone();
}
- /**
- * Supply message wrapper.
- */
- private static class SupplyMessage {
- /** Sender ID. */
- private UUID sndId;
-
- /** Supply message. */
- private GridDhtPartitionSupplyMessage supply;
-
- /**
- * Dummy constructor.
- */
- private SupplyMessage() {
- // No-op.
- }
+ void onMissedPartition(UUID nodeId, int p) {
+ if (missed.get(nodeId) == null)
+ missed.put(nodeId, new GridConcurrentHashSet<Integer>());
- /**
- * @param sndId Sender ID.
- * @param supply Supply message.
- */
- SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
- this.sndId = sndId;
- this.supply = supply;
- }
+ missed.get(nodeId).add(p);
+ }
- /**
- * @return Sender ID.
- */
- UUID senderId() {
- return sndId;
- }
+ void onPartitionDone(UUID nodeId, int p) {
+ if (isDone())
+ return;
+
+ Collection<Integer> parts = remaining.get(nodeId);
+
+ parts.remove(p);
- /**
- * @return Message.
- */
- GridDhtPartitionSupplyMessage supply() {
- return supply;
+ if (parts.isEmpty()) {
+ remaining.remove(nodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']');
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(SupplyMessage.class, this);
+ checkIsDone();
+ }
+
+ private void checkIsDone() {
+ if (remaining.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Completed sync future.");
+
+ Collection<Integer> m = new HashSet<>();
+
+ for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+ if (e.getValue() != null && !e.getValue().isEmpty())
+ m.addAll(e.getValue());
+ }
+
+ if (!m.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Reassigning partitions that were missed: " + m);
+
+ cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
+ }
+
+ missed.clear();
+
+ onDone();
}
}
}
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 920d10d..b948fbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -108,7 +108,7 @@ class GridDhtPartitionSupplier {
* @return topic
*/
static Object topic(int idx, int id) {
- return TOPIC_CACHE.topic("SupplyPool", idx, id);
+ return TOPIC_CACHE.topic("Supplier", idx, id);
}
/**
@@ -138,8 +138,6 @@ class GridDhtPartitionSupplier {
this.preloadPred = preloadPred;
}
- boolean logg = false;
-
/**
* @return {@code true} if entered to busy state.
*/
@@ -172,9 +170,6 @@ class GridDhtPartitionSupplier {
if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
return;
- if (logg && cctx.name().equals("cache"))
- System.out.println("S " + idx + " process message " + cctx.localNode().id());
-
GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
d.updateSequence(), cctx.cacheId());
@@ -191,12 +186,8 @@ class GridDhtPartitionSupplier {
doneMap.remove(scId);
}
- if (doneMap.get(scId) != null) {
- if (logg && cctx.name().equals("cache"))
- System.out.println("S " + idx + " exit " + cctx.localNode().id());
-
+ if (doneMap.get(scId) != null)
return;
- }
long bCnt = 0;
@@ -282,9 +273,6 @@ class GridDhtPartitionSupplier {
return;
}
else {
- if (logg && cctx.name().equals("cache"))
- System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id());
-
s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
cctx.cacheId());
}
@@ -473,9 +461,6 @@ class GridDhtPartitionSupplier {
// Mark as last supply message.
s.last(part);
-// if (logg && cctx.name().equals("cache"))
-// System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id());
-
phase = 0;
sctx = null;
@@ -508,8 +493,6 @@ class GridDhtPartitionSupplier {
*/
private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
throws IgniteCheckedException {
- if (logg && cctx.name().equals("cache"))
- System.out.println("S sent "+ cctx.localNode().id());
try {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a22f281..8a097ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -254,7 +254,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
+ @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException {
demander.addAssignments(assignments, forcePreload);
}
[2/8] incubator-ignite git commit: ignite-1093 Code cleanup
Posted by av...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
new file mode 100644
index 0000000..920d10d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Thread pool for supplying partitions to demanding nodes.
+ */
+class GridDhtPartitionSupplier {
+ /** */
+ private final GridCacheContext<?, ?> cctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final ReadWriteLock busyLock;
+
+ /** */
+ private GridDhtPartitionTopology top;
+
+ /** */
+ private final boolean depEnabled;
+
+ /** Preload predicate. */
+ private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+ /** Supply context map. */
+ private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+
+ /** Done map. */
+ private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();
+
+ /**
+ * @param cctx Cache context.
+ * @param busyLock Shutdown lock.
+ */
+ GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
+ assert cctx != null;
+ assert busyLock != null;
+
+ this.cctx = cctx;
+ this.busyLock = busyLock;
+
+ log = cctx.logger(getClass());
+
+ top = cctx.dht().topology();
+
+ if (!cctx.kernalContext().clientNode()) {
+ for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
+ final int idx = cnt;
+
+ cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+ if (!enterBusy())
+ return;
+
+ try {
+ processMessage(m, id, idx);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ }
+ }
+
+ depEnabled = cctx.gridDeploy().enabled();
+ }
+
+ /**
+ * @param idx Index.
+ * @param id Node id.
+ * @return topic
+ */
+ static Object topic(int idx, int id) {
+ return TOPIC_CACHE.topic("SupplyPool", idx, id);
+ }
+
+ /**
+ *
+ */
+ void start() {
+ }
+
+ /**
+ *
+ */
+ void stop() {
+ top = null;
+
+ if (!cctx.kernalContext().clientNode()) {
+ for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++)
+ cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId()));
+ }
+ }
+
+ /**
+ * Sets preload predicate for supply pool.
+ *
+ * @param preloadPred Preload predicate.
+ */
+ void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+ this.preloadPred = preloadPred;
+ }
+
+ boolean logg = false;
+
+ /**
+ * @return {@code true} if entered to busy state.
+ */
+ private boolean enterBusy() {
+ if (busyLock.readLock().tryLock())
+ return true;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
+
+ return false;
+ }
+
+ /**
+ *
+ */
+ private void leaveBusy() {
+ busyLock.readLock().unlock();
+ }
+
+ /**
+ * @param d Demand message.
+ * @param id Node uuid.
+ * @param idx Index.
+ */
+ private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) {
+ assert d != null;
+ assert id != null;
+
+ if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+ return;
+
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("S " + idx + " process message " + cctx.localNode().id());
+
+ GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(), cctx.cacheId());
+
+ long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+ ClusterNode node = cctx.discovery().node(id);
+
+ T2<UUID, Object> scId = new T2<>(id, d.topic());
+
+ try {
+ SupplyContext sctx = scMap.remove(scId);
+
+ if (!d.partitions().isEmpty()) {//Only initial request contains partitions.
+ doneMap.remove(scId);
+ }
+
+ if (doneMap.get(scId) != null) {
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("S " + idx + " exit " + cctx.localNode().id());
+
+ return;
+ }
+
+ long bCnt = 0;
+
+ int phase = 0;
+
+ boolean newReq = true;
+
+ long maxBatchesCnt = 3;//Todo: param
+
+ if (sctx != null) {
+ phase = sctx.phase;
+
+ maxBatchesCnt = 1;
+ }
+
+ Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
+
+ while ((sctx != null && newReq) || partIt.hasNext()) {
+ int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+ newReq = false;
+
+ GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+ if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+ // Reply with partition of "-1" to let sender know that
+ // this node is no longer an owner.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Requested partition is not owned by local node [part=" + part +
+ ", demander=" + id + ']');
+
+ continue;
+ }
+
+ GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+ try {
+ if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
+ swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+ cctx.swap().addOffHeapListener(part, swapLsnr);
+ cctx.swap().addSwapListener(part, swapLsnr);
+ }
+
+ boolean partMissing = false;
+
+ if (phase == 0)
+ phase = 1;
+
+ if (phase == 1) {
+ Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+ (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
+
+ while (entIt.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition, so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition [part=" + part +
+ ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (!reply(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
+
+ swapLsnr = null;
+
+ return;
+ }
+ else {
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id());
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+ cctx.cacheId());
+ }
+ }
+
+ GridCacheEntryEx e = entIt.next();
+
+ GridCacheEntryInfo info = e.info();
+
+ if (info != null && !info.isNew()) {
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ if (partMissing)
+ continue;
+
+ }
+
+ if (phase == 1)
+ phase = 2;
+
+ if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
+ (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+ cctx.swap().iterator(part);
+
+ // Iterator may be null if space does not exist.
+ if (iter != null) {
+ try {
+ boolean prepared = false;
+
+ while (iter.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break; // For.
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (!reply(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
+
+ swapLsnr = null;
+
+ return;
+ }
+ else {
+ s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+ cctx.cacheId());
+ }
+ }
+
+ Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+
+ GridCacheSwapEntry swapEntry = e.getValue();
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.keyBytes(e.getKey());
+ info.ttl(swapEntry.ttl());
+ info.expireTime(swapEntry.expireTime());
+ info.version(swapEntry.version());
+ info.value(swapEntry.value());
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry0(part, info, cctx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not send " +
+ "cache entry): " + info);
+
+ continue;
+ }
+
+ // Need to manually prepare cache message.
+ if (depEnabled && !prepared) {
+ ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+ swapEntry.valueClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+ null;
+
+ if (ldr == null)
+ continue;
+
+ if (ldr instanceof GridDeploymentInfo) {
+ s.prepare((GridDeploymentInfo)ldr);
+
+ prepared = true;
+ }
+ }
+ }
+
+ if (partMissing)
+ continue;
+ }
+ finally {
+ iter.close();
+ }
+ }
+ }
+
+ if (swapLsnr == null && sctx != null)
+ swapLsnr = sctx.swapLsnr;
+
+ // Stop receiving promote notifications.
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+
+ if (phase == 2)
+ phase = 3;
+
+ if (phase == 3 && swapLsnr != null) {
+ Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+ swapLsnr = null;
+
+ Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
+ (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+ while (lsnrIt.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ // No need to continue iteration over swap entries.
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (!reply(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
+
+ return;
+ }
+ else {
+ s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+ cctx.cacheId());
+ }
+ }
+
+ GridCacheEntryInfo info = lsnrIt.next();
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ // Mark as last supply message.
+ s.last(part);
+
+// if (logg && cctx.name().equals("cache"))
+// System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id());
+
+ phase = 0;
+
+ sctx = null;
+ }
+ finally {
+ loc.release();
+
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+ }
+ }
+
+ reply(node, d, s);
+
+ doneMap.put(scId, true);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition supply message to node: " + id, e);
+ }
+ }
+
+ /**
+ * @param n Node.
+ * @param d DemandMessage
+ * @param s Supply message.
+ * @return {@code True} if message was sent, {@code false} if recipient left grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+ throws IgniteCheckedException {
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("S sent "+ cctx.localNode().id());
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+ cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+ return true;
+ }
+ catch (ClusterTopologyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+ return false;
+ }
+ }
+
+ /**
+ * @param t Tuple.
+ * @param phase Phase.
+ * @param partIt Partition it.
+ * @param part Partition.
+ * @param entryIt Entry it.
+ * @param swapLsnr Swap listener.
+ */
+ private void saveSupplyContext(
+ T2 t,
+ int phase,
+ Iterator<Integer> partIt,
+ int part,
+ Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){
+ scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
+ }
+
+ /**
+ * Supply context.
+ */
+ private static class SupplyContext{
+ /** Phase. */
+ private int phase;
+
+ /** Partition iterator. */
+ private Iterator<Integer> partIt;
+
+ /** Entry iterator. */
+ private Iterator<?> entryIt;
+
+ /** Swap listener. */
+ private GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+ /** Partition. */
+ int part;
+
+ /**
+ * @param phase Phase.
+ * @param partIt Partition iterator.
+ * @param entryIt Entry iterator.
+ * @param swapLsnr Swap listener.
+ * @param part Partition.
+ */
+ public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
+ GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
+ this.phase = phase;
+ this.partIt = partIt;
+ this.entryIt = entryIt;
+ this.swapLsnr = swapLsnr;
+ this.part = part;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
deleted file mode 100644
index c1c9941..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ /dev/null
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jsr166.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.locks.*;
-
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-
-/**
- * Thread pool for supplying partitions to demanding nodes.
- */
-class GridDhtPartitionSupplyPool {
- /** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private GridDhtPartitionTopology top;
-
- /** */
- private final boolean depEnabled;
-
- /** Preload predicate. */
- private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
- /** Supply context map. */
- private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
-
- /** Done map. */
- private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();//Todo: refactor
-
- /**
- * @param cctx Cache context.
- * @param busyLock Shutdown lock.
- */
- GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
- assert cctx != null;
- assert busyLock != null;
-
- this.cctx = cctx;
-
- log = cctx.logger(getClass());
-
- top = cctx.dht().topology();
-
- int cnt = 0;
-
- if (!cctx.kernalContext().clientNode()) {
- while (cnt < cctx.config().getRebalanceThreadPoolSize()) {
- final int idx = cnt;
-
- cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- processMessage(m, id, idx);
- }
- });
-
- cnt++;
- }
- }
-
- depEnabled = cctx.gridDeploy().enabled();
- }
-
- /**
- * @param idx Index.
- * @param id Node id.
- * @return topic
- */
- static Object topic(int idx, int id) {
- return TOPIC_CACHE.topic("SupplyPool", idx, id);
- }
-
- /**
- *
- */
- void start() {
- }
-
- /**
- *
- */
- void stop() {
- top = null;
- }
-
- /**
- * Sets preload predicate for supply pool.
- *
- * @param preloadPred Preload predicate.
- */
- void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
- this.preloadPred = preloadPred;
- }
-
- boolean logg = false;
-
- /**
- * @param d Demand message.
- * @param id Node uuid.
- */
- private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) {
- assert d != null;
- assert id != null;
-
- if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
- return;
-
- if (logg && cctx.name().equals("cache"))
- System.out.println("S " + idx + " process message " + cctx.localNode().id());
-
- GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId());
-
- long preloadThrottle = cctx.config().getRebalanceThrottle();
-
- ClusterNode node = cctx.discovery().node(id);
-
- T2<UUID, Object> scId = new T2<>(id, d.topic());
-
- try {
- SupplyContext sctx = scMap.remove(scId);
-
- if (!d.partitions().isEmpty()) {//Only first request contains partitions.
- doneMap.remove(scId);
- }
-
- if (doneMap.get(scId) != null) {
- if (logg && cctx.name().equals("cache"))
- System.out.println("S " + idx + " exit " + cctx.localNode().id());
-
- return;
- }
-
- long bCnt = 0;
-
- int phase = 0;
-
- boolean newReq = true;
-
- long maxBatchesCnt = 3;//Todo: param
-
- if (sctx != null) {
- phase = sctx.phase;
-
- maxBatchesCnt = 1;
- }
-
- Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
-
- while ((sctx != null && newReq) || partIt.hasNext()) {
- int part = sctx != null && newReq ? sctx.part : partIt.next();
-
- newReq = false;
-
- GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
- if (loc == null || loc.state() != OWNING || !loc.reserve()) {
- // Reply with partition of "-1" to let sender know that
- // this node is no longer an owner.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Requested partition is not owned by local node [part=" + part +
- ", demander=" + id + ']');
-
- continue;
- }
-
- GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
- try {
- if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
- swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
- cctx.swap().addOffHeapListener(part, swapLsnr);
- cctx.swap().addSwapListener(part, swapLsnr);
- }
-
- boolean partMissing = false;
-
- if (phase == 0)
- phase = 1;
-
- if (phase == 1) {
- Iterator<GridDhtCacheEntry> entIt = sctx != null ?
- (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
-
- while (entIt.hasNext()) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition, so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition [part=" + part +
- ", nodeId=" + id + ']');
-
- partMissing = true;
-
- break;
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- if (!reply(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
-
- swapLsnr = null;
-
- return;
- }
- else {
- if (logg && cctx.name().equals("cache"))
- System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id());
-
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId());
- }
- }
-
- GridCacheEntryEx e = entIt.next();
-
- GridCacheEntryInfo info = e.info();
-
- if (info != null && !info.isNew()) {
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
- }
- }
-
- if (partMissing)
- continue;
-
- }
-
- if (phase == 1)
- phase = 2;
-
- if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
- GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
- (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
- cctx.swap().iterator(part);
-
- // Iterator may be null if space does not exist.
- if (iter != null) {
- try {
- boolean prepared = false;
-
- while (iter.hasNext()) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + id + ']');
-
- partMissing = true;
-
- break; // For.
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- if (!reply(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
-
- swapLsnr = null;
-
- return;
- }
- else {
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId());
- }
- }
-
- Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
-
- GridCacheSwapEntry swapEntry = e.getValue();
-
- GridCacheEntryInfo info = new GridCacheEntryInfo();
-
- info.keyBytes(e.getKey());
- info.ttl(swapEntry.ttl());
- info.expireTime(swapEntry.expireTime());
- info.version(swapEntry.version());
- info.value(swapEntry.value());
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry0(part, info, cctx);
- else {
- if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not send " +
- "cache entry): " + info);
-
- continue;
- }
-
- // Need to manually prepare cache message.
- if (depEnabled && !prepared) {
- ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
- swapEntry.valueClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
- null;
-
- if (ldr == null)
- continue;
-
- if (ldr instanceof GridDeploymentInfo) {
- s.prepare((GridDeploymentInfo)ldr);
-
- prepared = true;
- }
- }
- }
-
- if (partMissing)
- continue;
- }
- finally {
- iter.close();
- }
- }
- }
-
- if (swapLsnr == null && sctx != null)
- swapLsnr = sctx.swapLsnr;
-
- // Stop receiving promote notifications.
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
-
- if (phase == 2)
- phase = 3;
-
- if (phase == 3 && swapLsnr != null) {
- Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
- swapLsnr = null;
-
- Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
- (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
-
- while (lsnrIt.hasNext()) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + id + ']');
-
- // No need to continue iteration over swap entries.
- break;
- }
-
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- if (!reply(node, d, s))
- return;
-
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
-
- if (++bCnt >= maxBatchesCnt) {
- saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
-
- return;
- }
- else {
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId());
- }
- }
-
- GridCacheEntryInfo info = lsnrIt.next();
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
- }
- }
-
- // Mark as last supply message.
- s.last(part);
-
-// if (logg && cctx.name().equals("cache"))
-// System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id());
-
- phase = 0;
-
- sctx = null;
- }
- finally {
- loc.release();
-
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
- }
- }
-
- reply(node, d, s);
-
- doneMap.put(scId, true);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send partition supply message to node: " + id, e);
- }
- }
-
- /**
- * @param n Node.
- * @param s Supply message.
- * @return {@code True} if message was sent, {@code false} if recipient left grid.
- * @throws IgniteCheckedException If failed.
- */
- private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
- throws IgniteCheckedException {
- if (logg && cctx.name().equals("cache"))
- System.out.println("S sent "+ cctx.localNode().id());
-
- try {
- if (log.isDebugEnabled())
- log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
- cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
- return true;
- }
- catch (ClusterTopologyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
- return false;
- }
- }
-
-
- /**
- * Demand message wrapper.
- */
- private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param sndId Sender ID.
- * @param msg Message.
- */
- DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) {
- super(sndId, msg);
- }
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public DemandMessage() {
- // No-op.
- }
-
- /**
- * @return Sender ID.
- */
- UUID senderId() {
- return get1();
- }
-
- /**
- * @return Message.
- */
- public GridDhtPartitionDemandMessage message() {
- return get2();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
- }
- }
-
-
- /**
- * @param t T.
- * @param phase Phase.
- * @param partIt Partition it.
- * @param entryIt Entry it.
- * @param swapLsnr Swap listener.
- */
- private void saveSupplyContext(
- T2 t,
- int phase,
- Iterator<Integer> partIt,
- int part,
- Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){
- scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
- }
-
- private static class SupplyContext{
- private int phase;
-
- private Iterator<Integer> partIt;
-
- private Iterator<?> entryIt;
-
- private GridCacheEntryInfoCollectSwapListener swapLsnr;
-
- int part;
-
- public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
- GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
- this.phase = phase;
- this.partIt = partIt;
- this.entryIt = entryIt;
- this.swapLsnr = swapLsnr;
- this.part = part;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index fbcbc37..a22f281 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -61,10 +61,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
/** Partition suppliers. */
- private GridDhtPartitionSupplyPool supplyPool;
+ private GridDhtPartitionSupplier supplier;
/** Partition demanders. */
- private GridDhtPartitionDemandPool demandPool;
+ private GridDhtPartitionDemander demander;
/** Start future. */
private GridFutureAdapter<Object> startFut;
@@ -159,8 +159,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
});
- supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
- demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
+ supplier = new GridDhtPartitionSupplier(cctx, busyLock);
+ demander = new GridDhtPartitionDemander(cctx, busyLock);
cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
@@ -180,18 +180,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
topVer.setIfGreater(startTopVer);
- supplyPool.start();
- demandPool.start();
+ supplier.start();
+ demander.start();
}
/** {@inheritDoc} */
@Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
super.preloadPredicate(preloadPred);
- assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()";
+ assert supplier != null && demander != null : "preloadPredicate may be called only after start()";
- supplyPool.preloadPredicate(preloadPred);
- demandPool.preloadPredicate(preloadPred);
+ supplier.preloadPredicate(preloadPred);
+ demander.preloadPredicate(preloadPred);
}
/** {@inheritDoc} */
@@ -205,11 +205,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
// Acquire write busy lock.
busyLock.writeLock().lock();
- if (supplyPool != null)
- supplyPool.stop();
+ if (supplier != null)
+ supplier.stop();
- if (demandPool != null)
- demandPool.stop();
+ if (demander != null)
+ demander.stop();
top = null;
}
@@ -226,7 +226,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
- demandPool.syncFuture().listen(new CI1<Object>() {
+ demander.syncFuture().listen(new CI1<Object>() {
@Override public void apply(Object t) {
U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
"[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
@@ -245,17 +245,17 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
- demandPool.updateLastExchangeFuture(lastFut);
+ demander.updateLastExchangeFuture(lastFut);
}
/** {@inheritDoc} */
@Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
- return demandPool.assign(exchFut);
+ return demander.assign(exchFut);
}
/** {@inheritDoc} */
@Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
- demandPool.addAssignments(assignments, forcePreload);
+ demander.addAssignments(assignments, forcePreload);
}
/**
@@ -267,7 +267,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
+ return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
}
/**
@@ -526,12 +526,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void forcePreload() {
- demandPool.forcePreload();
+ demander.forcePreload();
}
/** {@inheritDoc} */
@Override public void unwindUndeploys() {
- demandPool.unwindUndeploys();
+ demander.unwindUndeploys();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
index 4992d19..5148753 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -34,7 +34,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
/** */
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
- private static int TEST_SIZE = 10_024_000;
+ private static int TEST_SIZE = 1_024_000;
/** cache name. */
protected static String CACHE_NAME_DHT = "cache";
[4/8] incubator-ignite git commit: ignite-1093 Code cleanup
Posted by av...@apache.org.
ignite-1093 Code cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/76ba5d95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/76ba5d95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/76ba5d95
Branch: refs/heads/ignite-1093
Commit: 76ba5d953133448b2753c0727098e7552ff79753
Parents: 9fbb559
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Mon Aug 10 12:46:26 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Mon Aug 10 12:46:26 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 21 ++++++++------------
1 file changed, 8 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76ba5d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 711b69b..fdd101e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -54,9 +54,6 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
*/
@SuppressWarnings("NonConstantFieldWithUpperCaseName")
public class GridDhtPartitionDemander {
- /** Dummy message to wake up a blocking queue if a node leaves. */
- private final SupplyMessage DUMMY_TOP = new SupplyMessage();
-
/** */
private final GridCacheContext<?, ?> cctx;
@@ -259,14 +256,6 @@ public class GridDhtPartitionDemander {
}
/**
- * @param msg Message to check.
- * @return {@code True} if dummy message.
- */
- private boolean dummyTopology(SupplyMessage msg) {
- return msg == DUMMY_TOP;
- }
-
- /**
* @param deque Deque to poll from.
* @param time Time to wait.
* @param w Worker.
@@ -558,8 +547,14 @@ public class GridDhtPartitionDemander {
cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
@Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) {
- handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
- exchFut, missed, d, remaining);
+ enterBusy();
+
+ try {
+ handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
+ exchFut, missed, d, remaining);
+ }finally{
+ leaveBusy();
+ }
}
});
[7/8] incubator-ignite git commit: ignite-1093
Posted by av...@apache.org.
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6a733ef9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6a733ef9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6a733ef9
Branch: refs/heads/ignite-1093
Commit: 6a733ef9165096f4c75e71874938cfc83a6998f8
Parents: db72f53
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Tue Aug 11 19:15:03 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Tue Aug 11 19:15:03 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 44 ++++++++++++++++++++
1 file changed, 44 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6a733ef9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index fca9f53..30a04c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -120,6 +120,50 @@ public class GridDhtPartitionDemander {
*
*/
void start() {
+ int rebalanceOrder = cctx.config().getRebalanceOrder();
+
+ if (!CU.isMarshallerCache(cctx.name())) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
+
+ try {
+ cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
+ "[cacheName=" + cctx.name() + ']');
+
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
+ }
+ }
+
+ if (rebalanceOrder > 0) {
+ IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+
+ try {
+ if (fut != null) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
+ ", rebalanceOrder=" + rebalanceOrder + ']');
+
+ fut.get();
+ }
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
+ "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
+ }
+ }
}
/**
[6/8] incubator-ignite git commit: ignite-1093
Posted by av...@apache.org.
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/db72f531
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/db72f531
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/db72f531
Branch: refs/heads/ignite-1093
Commit: db72f531342231cf45f49e395056c12cce3a79e5
Parents: d0b7d9f
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Tue Aug 11 18:58:05 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Tue Aug 11 18:58:05 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 135 +------------------
.../dht/preloader/GridDhtPreloader.java | 123 ++++++++++++++++-
2 files changed, 122 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db72f531/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index e177dae..fca9f53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -65,9 +65,6 @@ public class GridDhtPartitionDemander {
@GridToStringInclude
private volatile SyncFuture syncFut;
- /** Demand lock. */
- private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
-
/** Last timeout object. */
private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
@@ -227,44 +224,10 @@ public class GridDhtPartitionDemander {
}
/**
- * @param p Partition.
- * @param topVer Topology version.
- * @return Picked owners.
- */
- private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
- Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
- int affCnt = affNodes.size();
-
- Collection<ClusterNode> rmts = remoteOwners(p, topVer);
-
- int rmtCnt = rmts.size();
-
- if (rmtCnt <= affCnt)
- return rmts;
-
- List<ClusterNode> sorted = new ArrayList<>(rmts);
-
- // Sort in descending order, so nodes with higher order will be first.
- Collections.sort(sorted, CU.nodeComparator(false));
-
- // Pick newest nodes.
- return sorted.subList(0, affCnt);
- }
-
- /**
- * @param p Partition.
- * @param topVer Topology version.
- * @return Nodes owning this partition.
- */
- private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
- return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
- }
-
- /**
* @param assigns Assignments.
* @param force {@code True} if dummy reassign.
*/
+
void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Adding partition assignments: " + assigns);
@@ -408,20 +371,6 @@ public class GridDhtPartitionDemander {
}
/**
- *
- */
- void unwindUndeploys() {
- demandLock.writeLock().lock();
-
- try {
- cctx.deploy().unwind(cctx);
- }
- finally {
- demandLock.writeLock().unlock();
- }
- }
-
- /**
* @param idx Index.
* @param id Node id.
* @param supply Supply.
@@ -666,88 +615,6 @@ public class GridDhtPartitionDemander {
void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
lastExchangeFut = lastFut;
}
-
- /**
- * @param exchFut Exchange future.
- * @return Assignments of partitions to nodes.
- */
- GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
- // No assignments for disabled preloader.
- GridDhtPartitionTopology top = cctx.dht().topology();
-
- if (!cctx.rebalanceEnabled())
- return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
- int partCnt = cctx.affinity().partitions();
-
- assert exchFut.forcePreload() || exchFut.dummyReassign() ||
- exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
- "Topology version mismatch [exchId=" + exchFut.exchangeId() +
- ", topVer=" + top.topologyVersion() + ']';
-
- GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
- AffinityTopologyVersion topVer = assigns.topologyVersion();
-
- for (int p = 0; p < partCnt; p++) {
- if (cctx.shared().exchange().hasPendingExchange()) {
- if (log.isDebugEnabled())
- log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
- exchFut.exchangeId());
-
- break;
- }
-
- // If partition belongs to local node.
- if (cctx.affinity().localNode(p, topVer)) {
- GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
- assert part != null;
- assert part.id() == p;
-
- if (part.state() != MOVING) {
- if (log.isDebugEnabled())
- log.debug("Skipping partition assignment (state is not MOVING): " + part);
-
- continue; // For.
- }
-
- Collection<ClusterNode> picked = pickedOwners(p, topVer);
-
- if (picked.isEmpty()) {
- top.own(part);
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
- cctx.events().addPreloadEvent(p,
- EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
- discoEvt.type(), discoEvt.timestamp());
- }
-
- if (log.isDebugEnabled())
- log.debug("Owning partition as there are no other owners: " + part);
- }
- else {
- ClusterNode n = F.first(picked);
-
- GridDhtPartitionDemandMessage msg = assigns.get(n);
-
- if (msg == null) {
- assigns.put(n, msg = new GridDhtPartitionDemandMessage(
- top.updateSequence(),
- exchFut.exchangeId().topologyVersion(),
- cctx.cacheId()));
- }
-
- msg.addPartition(p);
- }
- }
- }
-
- return assigns;
- }
-
/**
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db72f531/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 8a097ed..d994a19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -42,6 +42,7 @@ import java.util.concurrent.locks.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
import static org.apache.ignite.internal.util.GridConcurrentFactory.*;
/**
@@ -76,6 +77,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
new ConcurrentHashMap8<>();
+ /** Demand lock. */
+ private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -250,7 +254,115 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
- return demander.assign(exchFut);
+ // No assignments for disabled preloader.
+ GridDhtPartitionTopology top = cctx.dht().topology();
+
+ if (!cctx.rebalanceEnabled())
+ return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+ int partCnt = cctx.affinity().partitions();
+
+ assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+ exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+ "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+ ", topVer=" + top.topologyVersion() + ']';
+
+ GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+ AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+ for (int p = 0; p < partCnt; p++) {
+ if (cctx.shared().exchange().hasPendingExchange()) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
+ exchFut.exchangeId());
+
+ break;
+ }
+
+ // If partition belongs to local node.
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+ assert part != null;
+ assert part.id() == p;
+
+ if (part.state() != MOVING) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition assignment (state is not MOVING): " + part);
+
+ continue; // For.
+ }
+
+ Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+ if (picked.isEmpty()) {
+ top.own(part);
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+ DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+ cctx.events().addPreloadEvent(p,
+ EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+ discoEvt.type(), discoEvt.timestamp());
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Owning partition as there are no other owners: " + part);
+ }
+ else {
+ ClusterNode n = F.first(picked);
+
+ GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+ if (msg == null) {
+ assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+ top.updateSequence(),
+ exchFut.exchangeId().topologyVersion(),
+ cctx.cacheId()));
+ }
+
+ msg.addPartition(p);
+ }
+ }
+ }
+
+ return assigns;
+ }
+
+ /**
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Picked owners.
+ */
+ private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+ int affCnt = affNodes.size();
+
+ Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+ int rmtCnt = rmts.size();
+
+ if (rmtCnt <= affCnt)
+ return rmts;
+
+ List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+ // Sort in descending order, so nodes with higher order will be first.
+ Collections.sort(sorted, CU.nodeComparator(false));
+
+ // Pick newest nodes.
+ return sorted.subList(0, affCnt);
+ }
+
+ /**
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Nodes owning this partition.
+ */
+ private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+ return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
}
/** {@inheritDoc} */
@@ -531,7 +643,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void unwindUndeploys() {
- demander.unwindUndeploys();
+ demandLock.writeLock().lock();
+
+ try {
+ cctx.deploy().unwind(cctx);
+ }
+ finally {
+ demandLock.writeLock().unlock();
+ }
}
/**
[3/8] incubator-ignite git commit: ignite-1093 Code cleanup
Posted by av...@apache.org.
ignite-1093 Code cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9fbb5597
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9fbb5597
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9fbb5597
Branch: refs/heads/ignite-1093
Commit: 9fbb5597293a6805dad8fb10dec24b5628bb201f
Parents: 4776fec
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Mon Aug 10 12:08:20 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Mon Aug 10 12:08:20 2015 +0300
----------------------------------------------------------------------
.../preloader/GridDhtPartitionDemandPool.java | 1127 ------------------
.../dht/preloader/GridDhtPartitionDemander.java | 1127 ++++++++++++++++++
.../dht/preloader/GridDhtPartitionSupplier.java | 582 +++++++++
.../preloader/GridDhtPartitionSupplyPool.java | 576 ---------
.../dht/preloader/GridDhtPreloader.java | 40 +-
.../GridCacheMassiveRebalancingSelfTest.java | 2 +-
6 files changed, 1730 insertions(+), 1724 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
deleted file mode 100644
index 11645e9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ /dev/null
@@ -1,1127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.timeout.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-import static org.apache.ignite.internal.processors.dr.GridDrType.*;
-
-/**
- * Thread pool for requesting partitions from other nodes
- * and populating local cache.
- */
-@SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool {
- /** Dummy message to wake up a blocking queue if a node leaves. */
- private final SupplyMessage DUMMY_TOP = new SupplyMessage();
-
- /** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final ReadWriteLock busyLock;
-
- /** */
- @GridToStringInclude
- private final Collection<DemandWorker> dmdWorkers;
-
- /** Preload predicate. */
- private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
- /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
- @GridToStringInclude
- private SyncFuture syncFut;
-
- /** Preload timeout. */
- private final AtomicLong timeout;
-
- /** Allows demand threads to synchronize their step. */
- private CyclicBarrier barrier;
-
- /** Demand lock. */
- private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
-
- /** */
- private int poolSize;
-
- /** Last timeout object. */
- private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
-
- /** Last exchange future. */
- private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
-
- /**
- * @param cctx Cache context.
- * @param busyLock Shutdown lock.
- */
- public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
- assert cctx != null;
- assert busyLock != null;
-
- this.cctx = cctx;
- this.busyLock = busyLock;
-
- log = cctx.logger(getClass());
-
- boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
-
- poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
- if (enabled) {
- barrier = new CyclicBarrier(poolSize);
-
- dmdWorkers = new ArrayList<>(poolSize);
-
- for (int i = 0; i < poolSize; i++)
- dmdWorkers.add(new DemandWorker(i));
-
- syncFut = new SyncFuture(dmdWorkers);
- }
- else {
- dmdWorkers = Collections.emptyList();
-
- syncFut = new SyncFuture(dmdWorkers);
-
- // Calling onDone() immediately since preloading is disabled.
- syncFut.onDone();
- }
-
- timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
- }
-
- /**
- *
- */
- void start() {
- if (poolSize > 0) {
- for (DemandWorker w : dmdWorkers)
- new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start();
- }
- }
-
- /**
- *
- */
- void stop() {
- U.cancel(dmdWorkers);
-
- if (log.isDebugEnabled())
- log.debug("Before joining on demand workers: " + dmdWorkers);
-
- U.join(dmdWorkers, log);
-
- if (log.isDebugEnabled())
- log.debug("After joining on demand workers: " + dmdWorkers);
-
- lastExchangeFut = null;
-
- lastTimeoutObj.set(null);
- }
-
- /**
- * @return Future for {@link CacheRebalanceMode#SYNC} mode.
- */
- IgniteInternalFuture<?> syncFuture() {
- return syncFut;
- }
-
- /**
- * Sets preload predicate for demand pool.
- *
- * @param preloadPred Preload predicate.
- */
- void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
- this.preloadPred = preloadPred;
- }
-
- /**
- * @return Size of this thread pool.
- */
- int poolSize() {
- return poolSize;
- }
-
- /**
- * Force preload.
- */
- void forcePreload() {
- GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
-
- if (obj != null)
- cctx.time().removeTimeoutObject(obj);
-
- final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
- if (exchFut != null) {
- if (log.isDebugEnabled())
- log.debug("Forcing rebalance event for future: " + exchFut);
-
- exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.shared().exchange().forcePreloadExchange(exchFut);
- }
- });
- }
- else if (log.isDebugEnabled())
- log.debug("Ignoring force rebalance request (no topology event happened yet).");
- }
-
- /**
- * @return {@code true} if entered to busy state.
- */
- private boolean enterBusy() {
- if (busyLock.readLock().tryLock())
- return true;
-
- if (log.isDebugEnabled())
- log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId());
-
- return false;
- }
-
- /**
- * @param idx
- * @return topic
- */
- static Object topic(int idx, int cacheId, UUID nodeId) {
- return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: remove nodeId
- }
-
- /**
- *
- */
- private void leaveBusy() {
- busyLock.readLock().unlock();
- }
-
- /**
- * @param type Type.
- * @param discoEvt Discovery event.
- */
- private void preloadEvent(int type, DiscoveryEvent discoEvt) {
- preloadEvent(-1, type, discoEvt);
- }
-
- /**
- * @param part Partition.
- * @param type Type.
- * @param discoEvt Discovery event.
- */
- private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
- assert discoEvt != null;
-
- cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
- }
-
- /**
- * @param msg Message to check.
- * @return {@code True} if dummy message.
- */
- private boolean dummyTopology(SupplyMessage msg) {
- return msg == DUMMY_TOP;
- }
-
- /**
- * @param deque Deque to poll from.
- * @param time Time to wait.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(time, MILLISECONDS);
- }
-
- /**
- * @param p Partition.
- * @param topVer Topology version.
- * @return Picked owners.
- */
- private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
- Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
- int affCnt = affNodes.size();
-
- Collection<ClusterNode> rmts = remoteOwners(p, topVer);
-
- int rmtCnt = rmts.size();
-
- if (rmtCnt <= affCnt)
- return rmts;
-
- List<ClusterNode> sorted = new ArrayList<>(rmts);
-
- // Sort in descending order, so nodes with higher order will be first.
- Collections.sort(sorted, CU.nodeComparator(false));
-
- // Pick newest nodes.
- return sorted.subList(0, affCnt);
- }
-
- /**
- * @param p Partition.
- * @param topVer Topology version.
- * @return Nodes owning this partition.
- */
- private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
- return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
- }
-
- /**
- * @param assigns Assignments.
- * @param force {@code True} if dummy reassign.
- */
- void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
- if (log.isDebugEnabled())
- log.debug("Adding partition assignments: " + assigns);
-
- long delay = cctx.config().getRebalanceDelay();
-
- if (delay == 0 || force) {
- assert assigns != null;
-
- synchronized (dmdWorkers) {
- for (DemandWorker w : dmdWorkers)
- w.addAssignments(assigns);
- }
- }
- else if (delay > 0) {
- assert !force;
-
- GridTimeoutObject obj = lastTimeoutObj.get();
-
- if (obj != null)
- cctx.time().removeTimeoutObject(obj);
-
- final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
- assert exchFut != null : "Delaying rebalance process without topology event.";
-
- obj = new GridTimeoutObjectAdapter(delay) {
- @Override public void onTimeout() {
- exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
- cctx.shared().exchange().forcePreloadExchange(exchFut);
- }
- });
- }
- };
-
- lastTimeoutObj.set(obj);
-
- cctx.time().addTimeoutObject(obj);
- }
- }
-
- /**
- *
- */
- void unwindUndeploys() {
- demandLock.writeLock().lock();
-
- try {
- cctx.deploy().unwind(cctx);
- }
- finally {
- demandLock.writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtPartitionDemandPool.class, this);
- }
-
- /**
- *
- */
- private class DemandWorker extends GridWorker {
- /** Worker ID. */
- private int id;
-
- /** Partition-to-node assignments. */
- private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
-
- /** Hide worker logger and use cache logger instead. */
- private IgniteLogger log = GridDhtPartitionDemandPool.this.log;
-
- /**
- * @param id Worker ID.
- */
- private DemandWorker(int id) {
- super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemandPool.this.log);
-
- assert id >= 0;
-
- this.id = id;
- }
-
- /**
- * @param assigns Assignments.
- */
- void addAssignments(GridDhtPreloaderAssignments assigns) {
- assert assigns != null;
-
- assignQ.offer(assigns);
-
- if (log.isDebugEnabled())
- log.debug("Added assignments to worker: " + this);
- }
-
- /**
- * @return {@code True} if topology changed.
- */
- private boolean topologyChanged() {
- return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged();
- }
-
- /**
- * @param pick Node picked for preloading.
- * @param p Partition.
- * @param entry Preloaded entry.
- * @param topVer Topology version.
- * @return {@code False} if partition has become invalid during preloading.
- * @throws IgniteInterruptedCheckedException If interrupted.
- */
- private boolean preloadEntry(
- ClusterNode pick,
- int p,
- GridCacheEntryInfo entry,
- AffinityTopologyVersion topVer
- ) throws IgniteCheckedException {
- try {
- GridCacheEntryEx cached = null;
-
- try {
- cached = cctx.dht().entryEx(entry.key());
-
- if (log.isDebugEnabled())
- log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
-
- if (cctx.dht().isIgfsDataCache() &&
- cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
- LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
- "value, will ignore rebalance entries): " + name());
-
- if (cached.markObsoleteIfEmpty(null))
- cached.context().cache().removeIfObsolete(cached.key());
-
- return true;
- }
-
- if (preloadPred == null || preloadPred.apply(entry)) {
- if (cached.initialValue(
- entry.value(),
- entry.version(),
- entry.ttl(),
- entry.expireTime(),
- true,
- topVer,
- cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
- )) {
- cctx.evicts().touch(cached, topVer); // Start tracking.
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
- cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
- (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
- false, null, null, null);
- }
- else if (log.isDebugEnabled())
- log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
- ", part=" + p + ']');
- }
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
- cached.key() + ", part=" + p + ']');
- }
- catch (GridDhtInvalidPartitionException ignored) {
- if (log.isDebugEnabled())
- log.debug("Partition became invalid during rebalancing (will ignore): " + p);
-
- return false;
- }
- }
- catch (IgniteInterruptedCheckedException e) {
- throw e;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
- cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
- }
-
- return true;
- }
-
- /**
- * @param node Node to demand from.
- * @param topVer Topology version.
- * @param d Demand message.
- * @param exchFut Exchange future.
- * @return Missed partitions.
- * @throws InterruptedException If interrupted.
- * @throws ClusterTopologyCheckedException If node left.
- * @throws IgniteCheckedException If failed to send message.
- */
- private Set<Integer> demandFromNode(
- final ClusterNode node,
- final AffinityTopologyVersion topVer,
- final GridDhtPartitionDemandMessage d,
- final GridDhtPartitionsExchangeFuture exchFut
- ) throws InterruptedException, IgniteCheckedException {
- final GridDhtPartitionTopology top = cctx.dht().topology();
-
- long timeout = GridDhtPartitionDemandPool.this.timeout.get();
-
- d.timeout(timeout);
- d.workerId(id);
-
- final Set<Integer> missed = new HashSet<>();
-
- final ConcurrentHashMap8<Integer, Boolean> remaining = new ConcurrentHashMap8<>();
-
- for (int p : d.partitions())
- remaining.put(p, false);
-
- if (isCancelled() || topologyChanged())
- return missed;
-
- int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo = getRebalanceThreadPoolSize / assigns.count
-
- List<Set<Integer>> sParts = new ArrayList<>(threadCnt);
-
- int cnt = 0;
-
- while (cnt < threadCnt) {
- sParts.add(new HashSet<Integer>());
-
- final int idx = cnt;
-
- cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) {
- handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
- exchFut, missed, d, remaining);
- }
- });
-
- cnt++;
- }
-
- Iterator<Integer> it = d.partitions().iterator();
-
- cnt = 0;
-
- while (it.hasNext()) {
- sParts.get(cnt % threadCnt).add(it.next());
-
- cnt++;
- }
-
- try {
- cnt = 0;
-
- while (cnt < threadCnt) {
-
- // Create copy.
- GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
-
- initD.topic(topic(cnt, cctx.cacheId(),node.id()));
-
- try {
- if (logg && cctx.name().equals("cache"))
- System.out.println("D "+cnt + " initial Demand "+" "+cctx.localNode().id());
-
- cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send partition demand message to local node", e);
- }
-
- cnt++;
- }
-
- do {
- U.sleep(1000);//Todo: improve
- }
- while (!isCancelled() && !topologyChanged() && !remaining.isEmpty());
-
- return missed;
- }
- finally {
- cnt = 0;
-
- while (cnt < threadCnt) {
- cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), node.id()));
-
- cnt++;
- }
- }
- }
-
- boolean logg = false;
-
- /**
- * @param s Supply message.
- * @param node Node.
- * @param topVer Topology version.
- * @param top Topology.
- * @param exchFut Exchange future.
- * @param missed Missed.
- * @param d initial DemandMessage.
- */
- private void handleSupplyMessage(
- int idx,
- SupplyMessage s,
- ClusterNode node,
- AffinityTopologyVersion topVer,
- GridDhtPartitionTopology top,
- GridDhtPartitionsExchangeFuture exchFut,
- Set<Integer> missed,
- GridDhtPartitionDemandMessage d,
- ConcurrentHashMap8 remaining) {
-
- if (logg && cctx.name().equals("cache"))
- System.out.println("D "+idx + " handled supply message "+ cctx.localNode().id());
-
- // Check that message was received from expected node.
- if (!s.senderId().equals(node.id())) {
- U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
- ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
-
- return;
- }
-
- if (topologyChanged())
- return;
-
- if (log.isDebugEnabled())
- log.debug("Received supply message: " + s);
-
- GridDhtPartitionSupplyMessage supply = s.supply();
-
- // Check whether there were class loading errors on unmarshal
- if (supply.classError() != null) {
- if (log.isDebugEnabled())
- log.debug("Class got undeployed during preloading: " + supply.classError());
-
- return;
- }
-
- // Preload.
- for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
- int p = e.getKey();
-
- if (cctx.affinity().localNode(p, topVer)) {
- GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
- assert part != null;
-
- if (part.state() == MOVING) {
- boolean reserved = part.reserve();
-
- assert reserved : "Failed to reserve partition [gridName=" +
- cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
-
- part.lock();
-
- try {
- // Loop through all received entries and try to preload them.
- for (GridCacheEntryInfo entry : e.getValue().infos()) {
- if (!part.preloadingPermitted(entry.key(), entry.version())) {
- if (log.isDebugEnabled())
- log.debug("Preloading is not permitted for entry due to " +
- "evictions [key=" + entry.key() +
- ", ver=" + entry.version() + ']');
-
- continue;
- }
- try {
- if (!preloadEntry(node, p, entry, topVer)) {
- if (log.isDebugEnabled())
- log.debug("Got entries for invalid partition during " +
- "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
-
- break;
- }
- }
- catch (IgniteCheckedException ex) {
- cancel();
-
- return;
- }
- }
-
- boolean last = supply.last().contains(p);
-
- // If message was last for this partition,
- // then we take ownership.
- if (last) {
- top.own(part);//todo: close future?
-
-// if (logg && cctx.name().equals("cache"))
-// System.out.println("D "+idx + " last "+ p +" "+ cctx.localNode().id());
-
- remaining.remove(p);
-
- if (log.isDebugEnabled())
- log.debug("Finished rebalancing partition: " + part);
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- exchFut.discoveryEvent());
- }
- }
- finally {
- part.unlock();
- part.release();
- }
- }
- else {
- remaining.remove(p);
-
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
- }
- }
- else {
- remaining.remove(p);
-
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
- }
- }
-
- for (Integer miss : s.supply().missed())
- remaining.remove(miss);
-
- // Only request partitions based on latest topology version.
- for (Integer miss : s.supply().missed())
- if (cctx.affinity().localNode(miss, topVer))
- missed.add(miss);
-
- if (!remaining.isEmpty()) {
- try {
- // Create copy.
- GridDhtPartitionDemandMessage nextD =
- new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
-
- nextD.topic(topic(idx, cctx.cacheId(), node.id()));
-
- // Send demand message.
- cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(idx, cctx.cacheId()),
- nextD, cctx.ioPolicy(), d.timeout());
-
- if (logg && cctx.name().equals("cache"))
- System.out.println("D " + idx + " ack " + cctx.localNode().id());
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to receive partitions from node (rebalancing will not " +
- "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
-
- cancel();
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- try {
- int rebalanceOrder = cctx.config().getRebalanceOrder();
-
- if (!CU.isMarshallerCache(cctx.name())) {
- if (log.isDebugEnabled())
- log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
-
- try {
- cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
- "[cacheName=" + cctx.name() + ']');
-
- return;
- }
- catch (IgniteCheckedException e) {
- throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
- }
- }
-
- if (rebalanceOrder > 0) {
- IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
-
- try {
- if (fut != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
- ", rebalanceOrder=" + rebalanceOrder + ']');
-
- fut.get();
- }
- }
- catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
- "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-
- return;
- }
- catch (IgniteCheckedException e) {
- throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
- }
- }
-
- GridDhtPartitionsExchangeFuture exchFut = null;
-
- boolean stopEvtFired = false;
-
- while (!isCancelled()) {
- try {
- barrier.await();
-
- if (id == 0 && exchFut != null && !exchFut.dummy() &&
- cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
-
- if (!cctx.isReplicated() || !stopEvtFired) {
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
- stopEvtFired = true;
- }
- }
- }
- catch (BrokenBarrierException ignore) {
- throw new InterruptedException("Demand worker stopped.");
- }
-
- // Sync up all demand threads at this step.
- GridDhtPreloaderAssignments assigns = null;
-
- while (assigns == null)
- assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
-
- demandLock.readLock().lock();
-
- try {
- exchFut = assigns.exchangeFuture();
-
- // Assignments are empty if preloading is disabled.
- if (assigns.isEmpty())
- continue;
-
- boolean resync = false;
-
- // While.
- // =====
- while (!isCancelled() && !topologyChanged() && !resync) {
- Collection<Integer> missed = new HashSet<>();
-
- // For.
- // ===
- for (ClusterNode node : assigns.keySet()) {
- if (topologyChanged() || isCancelled())
- break; // For.
-
- GridDhtPartitionDemandMessage d = assigns.remove(node);
-
- // If another thread is already processing this message,
- // move to the next node.
- if (d == null)
- continue; // For.
-
- try {
- Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut);
-
- if (!set.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
- set + ']');
-
- missed.addAll(set);
- }
- }
- catch (IgniteInterruptedCheckedException e) {
- throw e;
- }
- catch (ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
- ", msg=" + e.getMessage() + ']');
-
- resync = true;
-
- break; // For.
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to receive partitions from node (rebalancing will not " +
- "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
- }
- }
-
- // Processed missed entries.
- if (!missed.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Reassigning partitions that were missed: " + missed);
-
- assert exchFut.exchangeId() != null;
-
- cctx.shared().exchange().forceDummyExchange(true, exchFut);
- }
- else
- break; // While.
- }
- }
- finally {
- demandLock.readLock().unlock();
-
- syncFut.onWorkerDone(this);
- }
-
- cctx.shared().exchange().scheduleResendPartitions();
- }
- }
- finally {
- // Safety.
- syncFut.onWorkerDone(this);
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DemandWorker.class, this, "assignQ", assignQ, "super", super.toString());
- }
- }
-
- /**
- * Sets last exchange future.
- *
- * @param lastFut Last future to set.
- */
- void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
- lastExchangeFut = lastFut;
- }
-
- /**
- * @param exchFut Exchange future.
- * @return Assignments of partitions to nodes.
- */
- GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
- // No assignments for disabled preloader.
- GridDhtPartitionTopology top = cctx.dht().topology();
-
- if (!cctx.rebalanceEnabled())
- return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
- int partCnt = cctx.affinity().partitions();
-
- assert exchFut.forcePreload() || exchFut.dummyReassign() ||
- exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
- "Topology version mismatch [exchId=" + exchFut.exchangeId() +
- ", topVer=" + top.topologyVersion() + ']';
-
- GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
- AffinityTopologyVersion topVer = assigns.topologyVersion();
-
- for (int p = 0; p < partCnt; p++) {
- if (cctx.shared().exchange().hasPendingExchange()) {
- if (log.isDebugEnabled())
- log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
- exchFut.exchangeId());
-
- break;
- }
-
- // If partition belongs to local node.
- if (cctx.affinity().localNode(p, topVer)) {
- GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
- assert part != null;
- assert part.id() == p;
-
- if (part.state() != MOVING) {
- if (log.isDebugEnabled())
- log.debug("Skipping partition assignment (state is not MOVING): " + part);
-
- continue; // For.
- }
-
- Collection<ClusterNode> picked = pickedOwners(p, topVer);
-
- if (picked.isEmpty()) {
- top.own(part);
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
- cctx.events().addPreloadEvent(p,
- EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
- discoEvt.type(), discoEvt.timestamp());
- }
-
- if (log.isDebugEnabled())
- log.debug("Owning partition as there are no other owners: " + part);
- }
- else {
- ClusterNode n = F.first(picked);
-
- GridDhtPartitionDemandMessage msg = assigns.get(n);
-
- if (msg == null) {
- assigns.put(n, msg = new GridDhtPartitionDemandMessage(
- top.updateSequence(),
- exchFut.exchangeId().topologyVersion(),
- cctx.cacheId()));
- }
-
- msg.addPartition(p);
- }
- }
- }
-
- return assigns;
- }
-
- /**
- *
- */
- private class SyncFuture extends GridFutureAdapter<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Remaining workers. */
- private Collection<DemandWorker> remaining;
-
- /**
- * @param workers List of workers.
- */
- private SyncFuture(Collection<DemandWorker> workers) {
- assert workers.size() == poolSize();
-
- remaining = Collections.synchronizedList(new LinkedList<>(workers));
- }
-
- /**
- * @param w Worker who iterated through all partitions.
- */
- void onWorkerDone(DemandWorker w) {
- if (isDone())
- return;
-
- if (remaining.remove(w))
- if (log.isDebugEnabled())
- log.debug("Completed full partition iteration for worker [worker=" + w + ']');
-
- if (remaining.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Completed sync future.");
-
- onDone();
- }
- }
- }
-
- /**
- * Supply message wrapper.
- */
- private static class SupplyMessage {
- /** Sender ID. */
- private UUID sndId;
-
- /** Supply message. */
- private GridDhtPartitionSupplyMessage supply;
-
- /**
- * Dummy constructor.
- */
- private SupplyMessage() {
- // No-op.
- }
-
- /**
- * @param sndId Sender ID.
- * @param supply Supply message.
- */
- SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
- this.sndId = sndId;
- this.supply = supply;
- }
-
- /**
- * @return Sender ID.
- */
- UUID senderId() {
- return sndId;
- }
-
- /**
- * @return Message.
- */
- GridDhtPartitionSupplyMessage supply() {
- return supply;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(SupplyMessage.class, this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
new file mode 100644
index 0000000..711b69b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -0,0 +1,1127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.dr.GridDrType.*;
+
+/**
+ * Thread pool for requesting partitions from other nodes
+ * and populating local cache.
+ */
+@SuppressWarnings("NonConstantFieldWithUpperCaseName")
+public class GridDhtPartitionDemander {
+ /** Dummy message to wake up a blocking queue if a node leaves. */
+ private final SupplyMessage DUMMY_TOP = new SupplyMessage();
+
+ /** */
+ private final GridCacheContext<?, ?> cctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final ReadWriteLock busyLock;
+
+ /** */
+ @GridToStringInclude
+ private final Collection<DemandWorker> dmdWorkers;
+
+ /** Preload predicate. */
+ private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+ /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
+ @GridToStringInclude
+ private SyncFuture syncFut;
+
+ /** Preload timeout. */
+ private final AtomicLong timeout;
+
+ /** Allows demand threads to synchronize their step. */
+ private CyclicBarrier barrier;
+
+ /** Demand lock. */
+ private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
+ /** */
+ private int poolSize;
+
+ /** Last timeout object. */
+ private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
+
+ /** Last exchange future. */
+ private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
+
+ /**
+ * @param cctx Cache context.
+ * @param busyLock Shutdown lock.
+ */
+ public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
+ assert cctx != null;
+ assert busyLock != null;
+
+ this.cctx = cctx;
+ this.busyLock = busyLock;
+
+ log = cctx.logger(getClass());
+
+ boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+
+ poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+ if (enabled) {
+ barrier = new CyclicBarrier(poolSize);
+
+ dmdWorkers = new ArrayList<>(poolSize);
+
+ for (int i = 0; i < poolSize; i++)
+ dmdWorkers.add(new DemandWorker(i));
+
+ syncFut = new SyncFuture(dmdWorkers);
+ }
+ else {
+ dmdWorkers = Collections.emptyList();
+
+ syncFut = new SyncFuture(dmdWorkers);
+
+ // Calling onDone() immediately since preloading is disabled.
+ syncFut.onDone();
+ }
+
+ timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
+ }
+
+ /**
+ *
+ */
+ void start() {
+ if (poolSize > 0) {
+ for (DemandWorker w : dmdWorkers)
+ new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start();
+ }
+ }
+
+ /**
+ *
+ */
+ void stop() {
+ U.cancel(dmdWorkers);
+
+ if (log.isDebugEnabled())
+ log.debug("Before joining on demand workers: " + dmdWorkers);
+
+ U.join(dmdWorkers, log);
+
+ if (log.isDebugEnabled())
+ log.debug("After joining on demand workers: " + dmdWorkers);
+
+ lastExchangeFut = null;
+
+ lastTimeoutObj.set(null);
+ }
+
+ /**
+ * @return Future for {@link CacheRebalanceMode#SYNC} mode.
+ */
+ IgniteInternalFuture<?> syncFuture() {
+ return syncFut;
+ }
+
+ /**
+ * Sets preload predicate for demand pool.
+ *
+ * @param preloadPred Preload predicate.
+ */
+ void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+ this.preloadPred = preloadPred;
+ }
+
+ /**
+ * @return Size of this thread pool.
+ */
+ int poolSize() {
+ return poolSize;
+ }
+
+ /**
+ * Force preload.
+ */
+ void forcePreload() {
+ GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
+
+ if (obj != null)
+ cctx.time().removeTimeoutObject(obj);
+
+ final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+ if (exchFut != null) {
+ if (log.isDebugEnabled())
+ log.debug("Forcing rebalance event for future: " + exchFut);
+
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.shared().exchange().forcePreloadExchange(exchFut);
+ }
+ });
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Ignoring force rebalance request (no topology event happened yet).");
+ }
+
+ /**
+ * @return {@code true} if entered to busy state.
+ */
+ private boolean enterBusy() {
+ if (busyLock.readLock().tryLock())
+ return true;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId());
+
+ return false;
+ }
+
+ /**
+ * @param idx
+ * @return topic
+ */
+ static Object topic(int idx, int cacheId, UUID nodeId) {
+ return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: remove nodeId
+ }
+
+ /**
+ *
+ */
+ private void leaveBusy() {
+ busyLock.readLock().unlock();
+ }
+
+ /**
+ * @param type Type.
+ * @param discoEvt Discovery event.
+ */
+ private void preloadEvent(int type, DiscoveryEvent discoEvt) {
+ preloadEvent(-1, type, discoEvt);
+ }
+
+ /**
+ * @param part Partition.
+ * @param type Type.
+ * @param discoEvt Discovery event.
+ */
+ private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+ assert discoEvt != null;
+
+ cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+ }
+
+ /**
+ * @param msg Message to check.
+ * @return {@code True} if dummy message.
+ */
+ private boolean dummyTopology(SupplyMessage msg) {
+ return msg == DUMMY_TOP;
+ }
+
+ /**
+ * @param deque Deque to poll from.
+ * @param time Time to wait.
+ * @param w Worker.
+ * @return Polled item.
+ * @throws InterruptedException If interrupted.
+ */
+ @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
+ assert w != null;
+
+ // There is currently a case where {@code interrupted}
+ // flag on a thread gets flipped during stop which causes the pool to hang. This check
+ // will always make sure that interrupted flag gets reset before going into wait conditions.
+ // The true fix should actually make sure that interrupted flag does not get reset or that
+ // interrupted exception gets propagated. Until we find a real fix, this method should
+ // always work to make sure that there is no hanging during stop.
+ if (w.isCancelled())
+ Thread.currentThread().interrupt();
+
+ return deque.poll(time, MILLISECONDS);
+ }
+
+ /**
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Picked owners.
+ */
+ private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+ int affCnt = affNodes.size();
+
+ Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+ int rmtCnt = rmts.size();
+
+ if (rmtCnt <= affCnt)
+ return rmts;
+
+ List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+ // Sort in descending order, so nodes with higher order will be first.
+ Collections.sort(sorted, CU.nodeComparator(false));
+
+ // Pick newest nodes.
+ return sorted.subList(0, affCnt);
+ }
+
+ /**
+ * @param p Partition.
+ * @param topVer Topology version.
+ * @return Nodes owning this partition.
+ */
+ private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+ return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
+ }
+
+ /**
+ * @param assigns Assignments.
+ * @param force {@code True} if dummy reassign.
+ */
+ void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
+ if (log.isDebugEnabled())
+ log.debug("Adding partition assignments: " + assigns);
+
+ long delay = cctx.config().getRebalanceDelay();
+
+ if (delay == 0 || force) {
+ assert assigns != null;
+
+ synchronized (dmdWorkers) {
+ for (DemandWorker w : dmdWorkers)
+ w.addAssignments(assigns);
+ }
+ }
+ else if (delay > 0) {
+ assert !force;
+
+ GridTimeoutObject obj = lastTimeoutObj.get();
+
+ if (obj != null)
+ cctx.time().removeTimeoutObject(obj);
+
+ final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+ assert exchFut != null : "Delaying rebalance process without topology event.";
+
+ obj = new GridTimeoutObjectAdapter(delay) {
+ @Override public void onTimeout() {
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+ cctx.shared().exchange().forcePreloadExchange(exchFut);
+ }
+ });
+ }
+ };
+
+ lastTimeoutObj.set(obj);
+
+ cctx.time().addTimeoutObject(obj);
+ }
+ }
+
+ /**
+ *
+ */
+ void unwindUndeploys() {
+ demandLock.writeLock().lock();
+
+ try {
+ cctx.deploy().unwind(cctx);
+ }
+ finally {
+ demandLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtPartitionDemander.class, this);
+ }
+
+ /**
+ *
+ */
+ private class DemandWorker extends GridWorker {
+ /** Worker ID. */
+ private int id;
+
+ /** Partition-to-node assignments. */
+ private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+
+ /** Hide worker logger and use cache logger instead. */
+ private IgniteLogger log = GridDhtPartitionDemander.this.log;
+
+ /**
+ * @param id Worker ID.
+ */
+ private DemandWorker(int id) {
+ super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemander.this.log);
+
+ assert id >= 0;
+
+ this.id = id;
+ }
+
+ /**
+ * @param assigns Assignments.
+ */
+ void addAssignments(GridDhtPreloaderAssignments assigns) {
+ assert assigns != null;
+
+ assignQ.offer(assigns);
+
+ if (log.isDebugEnabled())
+ log.debug("Added assignments to worker: " + this);
+ }
+
+ /**
+ * @return {@code True} if topology changed.
+ */
+ private boolean topologyChanged() {
+ return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged();
+ }
+
+ /**
+ * @param pick Node picked for preloading.
+ * @param p Partition.
+ * @param entry Preloaded entry.
+ * @param topVer Topology version.
+ * @return {@code False} if partition has become invalid during preloading.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private boolean preloadEntry(
+ ClusterNode pick,
+ int p,
+ GridCacheEntryInfo entry,
+ AffinityTopologyVersion topVer
+ ) throws IgniteCheckedException {
+ try {
+ GridCacheEntryEx cached = null;
+
+ try {
+ cached = cctx.dht().entryEx(entry.key());
+
+ if (log.isDebugEnabled())
+ log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
+
+ if (cctx.dht().isIgfsDataCache() &&
+ cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
+ LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
+ "value, will ignore rebalance entries): " + name());
+
+ if (cached.markObsoleteIfEmpty(null))
+ cached.context().cache().removeIfObsolete(cached.key());
+
+ return true;
+ }
+
+ if (preloadPred == null || preloadPred.apply(entry)) {
+ if (cached.initialValue(
+ entry.value(),
+ entry.version(),
+ entry.ttl(),
+ entry.expireTime(),
+ true,
+ topVer,
+ cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+ )) {
+ cctx.evicts().touch(cached, topVer); // Start tracking.
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+ cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
+ (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+ false, null, null, null);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+ ", part=" + p + ']');
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+ cached.key() + ", part=" + p + ']');
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+
+ return false;
+ }
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw e;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
+ cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+ }
+
+ return true;
+ }
+
+ /**
+ * @param node Node to demand from.
+ * @param topVer Topology version.
+ * @param d Demand message.
+ * @param exchFut Exchange future.
+ * @return Missed partitions.
+ * @throws InterruptedException If interrupted.
+ * @throws ClusterTopologyCheckedException If node left.
+ * @throws IgniteCheckedException If failed to send message.
+ */
+ private Set<Integer> demandFromNode(
+ final ClusterNode node,
+ final AffinityTopologyVersion topVer,
+ final GridDhtPartitionDemandMessage d,
+ final GridDhtPartitionsExchangeFuture exchFut
+ ) throws InterruptedException, IgniteCheckedException {
+ final GridDhtPartitionTopology top = cctx.dht().topology();
+
+ long timeout = GridDhtPartitionDemander.this.timeout.get();
+
+ d.timeout(timeout);
+ d.workerId(id);
+
+ final Set<Integer> missed = new HashSet<>();
+
+ final ConcurrentHashMap8<Integer, Boolean> remaining = new ConcurrentHashMap8<>();
+
+ for (int p : d.partitions())
+ remaining.put(p, false);
+
+ if (isCancelled() || topologyChanged())
+ return missed;
+
+ int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo = getRebalanceThreadPoolSize / assigns.count
+
+ List<Set<Integer>> sParts = new ArrayList<>(threadCnt);
+
+ int cnt = 0;
+
+ while (cnt < threadCnt) {
+ sParts.add(new HashSet<Integer>());
+
+ final int idx = cnt;
+
+ cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) {
+ handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
+ exchFut, missed, d, remaining);
+ }
+ });
+
+ cnt++;
+ }
+
+ Iterator<Integer> it = d.partitions().iterator();
+
+ cnt = 0;
+
+ while (it.hasNext()) {
+ sParts.get(cnt % threadCnt).add(it.next());
+
+ cnt++;
+ }
+
+ try {
+ cnt = 0;
+
+ while (cnt < threadCnt) {
+
+ // Create copy.
+ GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+ initD.topic(topic(cnt, cctx.cacheId(),node.id()));
+
+ try {
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("D "+cnt + " initial Demand "+" "+cctx.localNode().id());
+
+ cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition demand message to local node", e);
+ }
+
+ cnt++;
+ }
+
+ do {
+ U.sleep(1000);//Todo: improve
+ }
+ while (!isCancelled() && !topologyChanged() && !remaining.isEmpty());
+
+ return missed;
+ }
+ finally {
+ cnt = 0;
+
+ while (cnt < threadCnt) {
+ cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), node.id()));
+
+ cnt++;
+ }
+ }
+ }
+
+ boolean logg = false;
+
+ /**
+ * @param s Supply message.
+ * @param node Node.
+ * @param topVer Topology version.
+ * @param top Topology.
+ * @param exchFut Exchange future.
+ * @param missed Missed.
+ * @param d initial DemandMessage.
+ */
+ private void handleSupplyMessage(
+ int idx,
+ SupplyMessage s,
+ ClusterNode node,
+ AffinityTopologyVersion topVer,
+ GridDhtPartitionTopology top,
+ GridDhtPartitionsExchangeFuture exchFut,
+ Set<Integer> missed,
+ GridDhtPartitionDemandMessage d,
+ ConcurrentHashMap8 remaining) {
+
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("D "+idx + " handled supply message "+ cctx.localNode().id());
+
+ // Check that message was received from expected node.
+ if (!s.senderId().equals(node.id())) {
+ U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+ ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+
+ return;
+ }
+
+ if (topologyChanged())
+ return;
+
+ if (log.isDebugEnabled())
+ log.debug("Received supply message: " + s);
+
+ GridDhtPartitionSupplyMessage supply = s.supply();
+
+ // Check whether there were class loading errors on unmarshal
+ if (supply.classError() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Class got undeployed during preloading: " + supply.classError());
+
+ return;
+ }
+
+ // Preload.
+ for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+ int p = e.getKey();
+
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+ assert part != null;
+
+ if (part.state() == MOVING) {
+ boolean reserved = part.reserve();
+
+ assert reserved : "Failed to reserve partition [gridName=" +
+ cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+ part.lock();
+
+ try {
+ // Loop through all received entries and try to preload them.
+ for (GridCacheEntryInfo entry : e.getValue().infos()) {
+ if (!part.preloadingPermitted(entry.key(), entry.version())) {
+ if (log.isDebugEnabled())
+ log.debug("Preloading is not permitted for entry due to " +
+ "evictions [key=" + entry.key() +
+ ", ver=" + entry.version() + ']');
+
+ continue;
+ }
+ try {
+ if (!preloadEntry(node, p, entry, topVer)) {
+ if (log.isDebugEnabled())
+ log.debug("Got entries for invalid partition during " +
+ "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+
+ break;
+ }
+ }
+ catch (IgniteCheckedException ex) {
+ cancel();
+
+ return;
+ }
+ }
+
+ boolean last = supply.last().contains(p);
+
+ // If message was last for this partition,
+ // then we take ownership.
+ if (last) {
+ top.own(part);//todo: close future?
+
+// if (logg && cctx.name().equals("cache"))
+// System.out.println("D "+idx + " last "+ p +" "+ cctx.localNode().id());
+
+ remaining.remove(p);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished rebalancing partition: " + part);
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+ preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+ exchFut.discoveryEvent());
+ }
+ }
+ finally {
+ part.unlock();
+ part.release();
+ }
+ }
+ else {
+ remaining.remove(p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+ }
+ }
+ else {
+ remaining.remove(p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+ }
+ }
+
+ for (Integer miss : s.supply().missed())
+ remaining.remove(miss);
+
+ // Only request partitions based on latest topology version.
+ for (Integer miss : s.supply().missed())
+ if (cctx.affinity().localNode(miss, topVer))
+ missed.add(miss);
+
+ if (!remaining.isEmpty()) {
+ try {
+ // Create copy.
+ GridDhtPartitionDemandMessage nextD =
+ new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+
+ nextD.topic(topic(idx, cctx.cacheId(), node.id()));
+
+ // Send demand message.
+ cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
+ nextD, cctx.ioPolicy(), d.timeout());
+
+ if (logg && cctx.name().equals("cache"))
+ System.out.println("D " + idx + " ack " + cctx.localNode().id());
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+ "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
+
+ cancel();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ try {
+ int rebalanceOrder = cctx.config().getRebalanceOrder();
+
+ if (!CU.isMarshallerCache(cctx.name())) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
+
+ try {
+ cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
+ "[cacheName=" + cctx.name() + ']');
+
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
+ }
+ }
+
+ if (rebalanceOrder > 0) {
+ IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+
+ try {
+ if (fut != null) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
+ ", rebalanceOrder=" + rebalanceOrder + ']');
+
+ fut.get();
+ }
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
+ "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
+ }
+ }
+
+ GridDhtPartitionsExchangeFuture exchFut = null;
+
+ boolean stopEvtFired = false;
+
+ while (!isCancelled()) {
+ try {
+ barrier.await();
+
+ if (id == 0 && exchFut != null && !exchFut.dummy() &&
+ cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
+
+ if (!cctx.isReplicated() || !stopEvtFired) {
+ preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
+ stopEvtFired = true;
+ }
+ }
+ }
+ catch (BrokenBarrierException ignore) {
+ throw new InterruptedException("Demand worker stopped.");
+ }
+
+ // Sync up all demand threads at this step.
+ GridDhtPreloaderAssignments assigns = null;
+
+ while (assigns == null)
+ assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
+
+ demandLock.readLock().lock();
+
+ try {
+ exchFut = assigns.exchangeFuture();
+
+ // Assignments are empty if preloading is disabled.
+ if (assigns.isEmpty())
+ continue;
+
+ boolean resync = false;
+
+ // While.
+ // =====
+ while (!isCancelled() && !topologyChanged() && !resync) {
+ Collection<Integer> missed = new HashSet<>();
+
+ // For.
+ // ===
+ for (ClusterNode node : assigns.keySet()) {
+ if (topologyChanged() || isCancelled())
+ break; // For.
+
+ GridDhtPartitionDemandMessage d = assigns.remove(node);
+
+ // If another thread is already processing this message,
+ // move to the next node.
+ if (d == null)
+ continue; // For.
+
+ try {
+ Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut);
+
+ if (!set.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
+ set + ']');
+
+ missed.addAll(set);
+ }
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw e;
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+ ", msg=" + e.getMessage() + ']');
+
+ resync = true;
+
+ break; // For.
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+ "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+ }
+ }
+
+ // Processed missed entries.
+ if (!missed.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Reassigning partitions that were missed: " + missed);
+
+ assert exchFut.exchangeId() != null;
+
+ cctx.shared().exchange().forceDummyExchange(true, exchFut);
+ }
+ else
+ break; // While.
+ }
+ }
+ finally {
+ demandLock.readLock().unlock();
+
+ syncFut.onWorkerDone(this);
+ }
+
+ cctx.shared().exchange().scheduleResendPartitions();
+ }
+ }
+ finally {
+ // Safety.
+ syncFut.onWorkerDone(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DemandWorker.class, this, "assignQ", assignQ, "super", super.toString());
+ }
+ }
+
+ /**
+ * Sets last exchange future.
+ *
+ * @param lastFut Last future to set.
+ */
+ void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+ lastExchangeFut = lastFut;
+ }
+
+ /**
+ * @param exchFut Exchange future.
+ * @return Assignments of partitions to nodes.
+ */
+ GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
+ // No assignments for disabled preloader.
+ GridDhtPartitionTopology top = cctx.dht().topology();
+
+ if (!cctx.rebalanceEnabled())
+ return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+ int partCnt = cctx.affinity().partitions();
+
+ assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+ exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+ "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+ ", topVer=" + top.topologyVersion() + ']';
+
+ GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+ AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+ for (int p = 0; p < partCnt; p++) {
+ if (cctx.shared().exchange().hasPendingExchange()) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
+ exchFut.exchangeId());
+
+ break;
+ }
+
+ // If partition belongs to local node.
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+ assert part != null;
+ assert part.id() == p;
+
+ if (part.state() != MOVING) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping partition assignment (state is not MOVING): " + part);
+
+ continue; // For.
+ }
+
+ Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+ if (picked.isEmpty()) {
+ top.own(part);
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+ DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+ cctx.events().addPreloadEvent(p,
+ EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+ discoEvt.type(), discoEvt.timestamp());
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Owning partition as there are no other owners: " + part);
+ }
+ else {
+ ClusterNode n = F.first(picked);
+
+ GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+ if (msg == null) {
+ assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+ top.updateSequence(),
+ exchFut.exchangeId().topologyVersion(),
+ cctx.cacheId()));
+ }
+
+ msg.addPartition(p);
+ }
+ }
+ }
+
+ return assigns;
+ }
+
+ /**
+ *
+ */
+ private class SyncFuture extends GridFutureAdapter<Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Remaining workers. */
+ private Collection<DemandWorker> remaining;
+
+ /**
+ * @param workers List of workers.
+ */
+ private SyncFuture(Collection<DemandWorker> workers) {
+ assert workers.size() == poolSize();
+
+ remaining = Collections.synchronizedList(new LinkedList<>(workers));
+ }
+
+ /**
+ * @param w Worker who iterated through all partitions.
+ */
+ void onWorkerDone(DemandWorker w) {
+ if (isDone())
+ return;
+
+ if (remaining.remove(w))
+ if (log.isDebugEnabled())
+ log.debug("Completed full partition iteration for worker [worker=" + w + ']');
+
+ if (remaining.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Completed sync future.");
+
+ onDone();
+ }
+ }
+ }
+
+ /**
+ * Supply message wrapper.
+ */
+ private static class SupplyMessage {
+ /** Sender ID. */
+ private UUID sndId;
+
+ /** Supply message. */
+ private GridDhtPartitionSupplyMessage supply;
+
+ /**
+ * Dummy constructor.
+ */
+ private SupplyMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param sndId Sender ID.
+ * @param supply Supply message.
+ */
+ SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
+ this.sndId = sndId;
+ this.supply = supply;
+ }
+
+ /**
+ * @return Sender ID.
+ */
+ UUID senderId() {
+ return sndId;
+ }
+
+ /**
+ * @return Message.
+ */
+ GridDhtPartitionSupplyMessage supply() {
+ return supply;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SupplyMessage.class, this);
+ }
+ }
+}
[8/8] incubator-ignite git commit: ignite-1093
Posted by av...@apache.org.
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/50e188df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/50e188df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/50e188df
Branch: refs/heads/ignite-1093
Commit: 50e188df2d20e7fe889e26ce576b55d6192d8ab4
Parents: 6a733ef
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Tue Aug 11 19:20:14 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Tue Aug 11 19:20:14 2015 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/preloader/GridDhtPartitionDemander.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50e188df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 30a04c0..f6a33c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -731,6 +731,8 @@ private class SyncFuture extends GridFutureAdapter<Object> {
missed.clear();
+ cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary?
+
onDone();
}
}