You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/05 17:31:54 UTC
incubator-ignite git commit: ignite-1093 Parallel supplyPool
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1093 8341c6d76 -> 4ed77e375
ignite-1093 Parallel supplyPool
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4ed77e37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4ed77e37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4ed77e37
Branch: refs/heads/ignite-1093
Commit: 4ed77e37529e34795a3b581010be3e0f35c8fe6e
Parents: 8341c6d
Author: Anton Vinogradov <av...@gridgain.com>
Authored: Wed Aug 5 18:31:34 2015 +0300
Committer: Anton Vinogradov <av...@gridgain.com>
Committed: Wed Aug 5 18:31:34 2015 +0300
----------------------------------------------------------------------
.../preloader/GridDhtPartitionDemandPool.java | 78 +--
.../preloader/GridDhtPartitionSupplyPool.java | 577 ++++++++++---------
.../GridCacheMassiveRebalancingSelfTest.java | 115 +++-
3 files changed, 432 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ed77e37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index d9d871f..0e0bc01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -424,22 +424,6 @@ public class GridDhtPartitionDemandPool {
}
/**
- * @param timeout Timed out value.
- */
- private void growTimeout(long timeout) {
- long newTimeout = (long)(timeout * 1.5D);
-
- // Account for overflow.
- if (newTimeout < 0)
- newTimeout = Long.MAX_VALUE;
-
- // Grow by 50% only if another thread didn't do it already.
- if (GridDhtPartitionDemandPool.this.timeout.compareAndSet(timeout, newTimeout))
- U.warn(log, "Increased rebalancing message timeout from " + timeout + "ms to " +
- newTimeout + "ms.");
- }
-
- /**
* @param pick Node picked for preloading.
* @param p Partition.
* @param entry Preloaded entry.
@@ -554,7 +538,7 @@ public class GridDhtPartitionDemandPool {
return missed;
for (int p : d.partitions()) {
- cctx.io().addOrderedHandler(topic(p, node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ cctx.io().addOrderedHandler(topic(p, topVer.topologyVersion()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
@Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
handleSupplyMessage(new SupplyMessage(nodeId, msg), node, topVer, top, remaining,
exchFut, missed, d);
@@ -565,26 +549,25 @@ public class GridDhtPartitionDemandPool {
try {
Iterator<Integer> it = remaining.keySet().iterator();
- final int maxC = Runtime.getRuntime().availableProcessors() / 2; //Todo: make param
+ final int maxC = cctx.config().getRebalanceThreadPoolSize();
int sent = 0;
while (sent < maxC && it.hasNext()) {
int p = it.next();
- Collection<Integer> ps = Collections.singleton(p);
-
boolean res = remaining.replace(p, false, true);
assert res;
// Create copy.
- GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, ps);
+ GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
- initD.topic(topic(p, node.id()));
+ initD.topic(topic(p, topVer.topologyVersion()));
// Send initial demand message.
- cctx.io().send(node, initD, cctx.ioPolicy());
+ cctx.io().sendOrderedMessage(node,
+ GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
sent++;
}
@@ -598,17 +581,17 @@ public class GridDhtPartitionDemandPool {
}
finally {
for (int p : d.partitions())
- cctx.io().removeOrderedHandler(topic(p, node.id()));
+ cctx.io().removeOrderedHandler(topic(p, topVer.topologyVersion()));
}
}
/**
- * @param p Partition.
- * @param id remote node id.
+ * @param p
+ * @param topVer
* @return topic
*/
- private Object topic(int p, UUID id) {
- return TOPIC_CACHE.topic("Preloading", id, cctx.cacheId(), p);
+ private Object topic(int p, long topVer) {
+ return TOPIC_CACHE.topic("DemandPool" + topVer, cctx.cacheId(), p);//Todo topVer as long
}
/**
@@ -631,7 +614,7 @@ public class GridDhtPartitionDemandPool {
Set<Integer> missed,
GridDhtPartitionDemandMessage d) {
- //Todo: check and remove
+ //Todo: check it still actual and remove
// Check that message was received from expected node.
if (!s.senderId().equals(node.id())) {
U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
@@ -640,6 +623,9 @@ public class GridDhtPartitionDemandPool {
return;
}
+ if (topologyChanged())
+ return;
+
if (log.isDebugEnabled())
log.debug("Received supply message: " + s);
@@ -715,7 +701,26 @@ public class GridDhtPartitionDemandPool {
remaining.remove(p);
- demandNextPartition(node, remaining, d);
+ demandNextPartition(node, remaining, d, topVer);
+ }
+ else {
+ try {
+ // Create copy.
+ GridDhtPartitionDemandMessage nextD =
+ new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
+
+ nextD.topic(topic(p, topVer.topologyVersion()));
+
+ // Send demand message.
+ cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()),
+ nextD, cctx.ioPolicy(), d.timeout());
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+ "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
+
+ cancel();
+ }
}
}
finally {
@@ -751,24 +756,25 @@ public class GridDhtPartitionDemandPool {
* @param node Node.
* @param remaining Remaining.
* @param d initial DemandMessage.
+ * @param topVer Topology version.
*/
private void demandNextPartition(
final ClusterNode node,
final ConcurrentHashMap8<Integer, Boolean> remaining,
- final GridDhtPartitionDemandMessage d
+ final GridDhtPartitionDemandMessage d,
+ final AffinityTopologyVersion topVer
) {
try {
for (Integer p : remaining.keySet()) {
if (remaining.replace(p, false, true)) {
- Collection<Integer> nextPs = Collections.singleton(p);
-
// Create copy.
- GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, nextPs);
+ GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
- nextD.topic(topic(p, node.id()));
+ nextD.topic(topic(p, topVer.topologyVersion()));
// Send demand message.
- cctx.io().send(node, nextD, cctx.ioPolicy());
+ cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()),
+ nextD, cctx.ioPolicy(), d.timeout());
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ed77e37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 42d6bb2..f10837a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -19,25 +19,22 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
+import org.jsr166.*;
import java.io.*;
import java.util.*;
-import java.util.concurrent.*;
import java.util.concurrent.locks.*;
-import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.internal.GridTopic.*;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
/**
@@ -51,23 +48,20 @@ class GridDhtPartitionSupplyPool {
private final IgniteLogger log;
/** */
- private final ReadWriteLock busyLock;
-
- /** */
private GridDhtPartitionTopology top;
/** */
- private final Collection<SupplyWorker> workers = new LinkedList<>();
-
- /** */
- private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>();
-
- /** */
private final boolean depEnabled;
/** Preload predicate. */
private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+ /** Supply context map. */
+ private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+
+ /** Done map. */
+ private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();//Todo: refactor
+
/**
* @param cctx Cache context.
* @param busyLock Shutdown lock.
@@ -77,43 +71,42 @@ class GridDhtPartitionSupplyPool {
assert busyLock != null;
this.cctx = cctx;
- this.busyLock = busyLock;
log = cctx.logger(getClass());
top = cctx.dht().topology();
if (!cctx.kernalContext().clientNode()) {
- int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
- for (int i = 0; i < poolSize; i++)
- workers.add(new SupplyWorker());
-
- cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- processDemandMessage(id, m);
- }
- });
+ for (int p = 0; p <= cctx.affinity().partitions(); p++)
+ cctx.io().addOrderedHandler(topic(p, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+ processMessage(m, id);
+ }
+ });
}
depEnabled = cctx.gridDeploy().enabled();
}
/**
+ * @param p Partition.
+ * @param id Node id.
+ * @return topic
+ */
+ static Object topic(int p, int id) {
+ return TOPIC_CACHE.topic("SupplyPool", id, p);
+ }
+
+ /**
*
*/
void start() {
- for (SupplyWorker w : workers)
- new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start();
}
/**
*
*/
void stop() {
- U.cancel(workers);
- U.join(workers, log);
-
top = null;
}
@@ -127,164 +120,85 @@ class GridDhtPartitionSupplyPool {
}
/**
- * @return Size of this thread pool.
+ * @param d Demand message.
+ * @param id Node uuid.
*/
- int poolSize() {
- return cctx.config().getRebalanceThreadPoolSize();
- }
+ private void processMessage(GridDhtPartitionDemandMessage d, UUID id) {
+ assert d != null;
+ assert id != null;
- /**
- * @return {@code true} if entered to busy state.
- */
- private boolean enterBusy() {
- if (busyLock.readLock().tryLock())
- return true;
+ GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(), cctx.cacheId());
- if (log.isDebugEnabled())
- log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
+ long preloadThrottle = cctx.config().getRebalanceThrottle();
- return false;
- }
+ long maxBatchesCnt = 3;//Todo: param
- /**
- * @param nodeId Sender node ID.
- * @param d Message.
- */
- private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) {
- if (!enterBusy())
- return;
+ ClusterNode node = cctx.discovery().node(id);
+
+ boolean ack = false;
+
+ T2<UUID, Object> scId = new T2<>(id, d.topic());
try {
- if (cctx.rebalanceEnabled()) {
- if (log.isDebugEnabled())
- log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']');
+ SupplyContext sctx = scMap.remove(scId);
- queue.offer(new DemandMessage(nodeId, d));
- }
- else
- U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d);
- }
- finally {
- leaveBusy();
- }
- }
+ if (doneMap.get(scId) != null)//Todo: refactor
+ return;
- /**
- *
- */
- private void leaveBusy() {
- busyLock.readLock().unlock();
- }
+ long bCnt = 0;
- /**
- * @param deque Deque to poll from.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(2000, MILLISECONDS);
- }
+ int phase = 0;
- /**
- * Supply work.
- */
- private class SupplyWorker extends GridWorker {
- /** Hide worker logger and use cache logger. */
- private IgniteLogger log = GridDhtPartitionSupplyPool.this.log;
+ if (sctx != null)
+ phase = sctx.phase;
- /**
- * Default constructor.
- */
- private SupplyWorker() {
- super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log);
- }
+ Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- DemandMessage msg = poll(queue, this);
+ while (sctx != null || partIt.hasNext()) {
+ int part = sctx != null ? sctx.part : partIt.next();
- if (msg == null)
- continue;
+ GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
- ClusterNode node = cctx.discovery().node(msg.senderId());
+ if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+ // Reply with partition of "-1" to let sender know that
+ // this node is no longer an owner.
+ s.missed(part);
- if (node == null) {
if (log.isDebugEnabled())
- log.debug("Received message from non-existing node (will ignore): " + msg);
+ log.debug("Requested partition is not owned by local node [part=" + part +
+ ", demander=" + id + ']');
continue;
}
- processMessage(msg, node);
- }
- }
+ GridCacheEntryInfoCollectSwapListener swapLsnr = null;
- /**
- * @param msg Message.
- * @param node Demander.
- */
- private void processMessage(DemandMessage msg, ClusterNode node) {
- assert msg != null;
- assert node != null;
-
- GridDhtPartitionDemandMessage d = msg.message();
-
- GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId());
-
- long preloadThrottle = cctx.config().getRebalanceThrottle();
-
- boolean ack = false;
+ try {
+ if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
+ swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
- try {
- for (int part : d.partitions()) {
- GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
- if (loc == null || loc.state() != OWNING || !loc.reserve()) {
- // Reply with partition of "-1" to let sender know that
- // this node is no longer an owner.
- s.missed(part);
-
- if (log.isDebugEnabled())
- log.debug("Requested partition is not owned by local node [part=" + part +
- ", demander=" + msg.senderId() + ']');
-
- continue;
+ cctx.swap().addOffHeapListener(part, swapLsnr);
+ cctx.swap().addSwapListener(part, swapLsnr);
}
- GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
- try {
- if (cctx.isSwapOrOffheapEnabled()) {
- swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+ boolean partMissing = false;
- cctx.swap().addOffHeapListener(part, swapLsnr);
- cctx.swap().addSwapListener(part, swapLsnr);
- }
+ if (phase == 0)
+ phase = 1;
- boolean partMissing = false;
+ if (phase == 1) {
+ Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+ (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
- for (GridCacheEntryEx e : loc.entries()) {
+ while (entIt.hasNext()) {
if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
// Demander no longer needs this partition, so we send '-1' partition and move on.
s.missed(part);
if (log.isDebugEnabled())
log.debug("Demanding node does not need requested partition [part=" + part +
- ", nodeId=" + msg.senderId() + ']');
+ ", nodeId=" + id + ']');
partMissing = true;
@@ -301,10 +215,21 @@ class GridDhtPartitionSupplyPool {
if (preloadThrottle > 0)
U.sleep(preloadThrottle);
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId());
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
+
+ swapLsnr = null;
+
+ return;
+ }
+ else {
+ s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+ cctx.cacheId());
+ }
}
+ GridCacheEntryEx e = entIt.next();
+
GridCacheEntryInfo info = e.info();
if (info != null && !info.isNew()) {
@@ -319,188 +244,228 @@ class GridDhtPartitionSupplyPool {
if (partMissing)
continue;
- if (cctx.isSwapOrOffheapEnabled()) {
- GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
- cctx.swap().iterator(part);
+ }
- // Iterator may be null if space does not exist.
- if (iter != null) {
- try {
- boolean prepared = false;
+ if (phase == 1)
+ phase = 2;
- for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
+ if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
+ (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+ cctx.swap().iterator(part);
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + msg.senderId() + ']');
+ // Iterator may be null if space does not exist.
+ if (iter != null) {
+ try {
+ boolean prepared = false;
- partMissing = true;
+ while (iter.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
- break; // For.
- }
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
+ partMissing = true;
- if (!reply(node, d, s))
- return;
+ break; // For.
+ }
- // Throttle preloading.
- if (preloadThrottle > 0)
- U.sleep(preloadThrottle);
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
- s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId());
- }
+ if (!reply(node, d, s))
+ return;
- GridCacheSwapEntry swapEntry = e.getValue();
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
- GridCacheEntryInfo info = new GridCacheEntryInfo();
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
- info.keyBytes(e.getKey());
- info.ttl(swapEntry.ttl());
- info.expireTime(swapEntry.expireTime());
- info.version(swapEntry.version());
- info.value(swapEntry.value());
+ swapLsnr = null;
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry0(part, info, cctx);
+ return;
+ }
else {
- if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not send " +
- "cache entry): " + info);
-
- continue;
+ s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+ cctx.cacheId());
}
+ }
- // Need to manually prepare cache message.
- if (depEnabled && !prepared) {
- ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
- swapEntry.valueClassLoaderId() != null ?
- cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
- null;
+ Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
- if (ldr == null)
- continue;
+ GridCacheSwapEntry swapEntry = e.getValue();
- if (ldr instanceof GridDeploymentInfo) {
- s.prepare((GridDeploymentInfo)ldr);
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
- prepared = true;
- }
- }
- }
+ info.keyBytes(e.getKey());
+ info.ttl(swapEntry.ttl());
+ info.expireTime(swapEntry.expireTime());
+ info.version(swapEntry.version());
+ info.value(swapEntry.value());
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry0(part, info, cctx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not send " +
+ "cache entry): " + info);
- if (partMissing)
continue;
+ }
+
+ // Need to manually prepare cache message.
+ if (depEnabled && !prepared) {
+ ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+ swapEntry.valueClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+ null;
+
+ if (ldr == null)
+ continue;
+
+ if (ldr instanceof GridDeploymentInfo) {
+ s.prepare((GridDeploymentInfo)ldr);
+
+ prepared = true;
+ }
+ }
}
- finally {
- iter.close();
- }
+
+ if (partMissing)
+ continue;
+ }
+ finally {
+ iter.close();
}
}
+ }
- // Stop receiving promote notifications.
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
+ if (swapLsnr == null && sctx != null)
+ swapLsnr = sctx.swapLsnr;
- if (swapLsnr != null) {
- Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+ // Stop receiving promote notifications.
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
- swapLsnr = null;
+ if (phase == 2)
+ phase = 3;
- for (GridCacheEntryInfo info : entries) {
- if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
- // Demander no longer needs this partition,
- // so we send '-1' partition and move on.
- s.missed(part);
+ if (phase == 3 && swapLsnr != null) {
+ Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
- if (log.isDebugEnabled())
- log.debug("Demanding node does not need requested partition " +
- "[part=" + part + ", nodeId=" + msg.senderId() + ']');
+ swapLsnr = null;
- // No need to continue iteration over swap entries.
- break;
- }
+ Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
+ (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+ while (lsnrIt.hasNext()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
- ack = true;
+ // No need to continue iteration over swap entries.
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
- if (!reply(node, d, s))
- return;
+ if (!reply(node, d, s))
+ return;
- s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(),
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ if (++bCnt >= maxBatchesCnt) {
+ saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
+
+ return;
+ }
+ else {
+ s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
cctx.cacheId());
}
-
- if (preloadPred == null || preloadPred.apply(info))
- s.addEntry(part, info, cctx);
- else if (log.isDebugEnabled())
- log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
- info);
}
- }
- // Mark as last supply message.
- s.last(part);
+ GridCacheEntryInfo info = lsnrIt.next();
-// if (ack) {
-// s.markAck();
-//
-// break; // Partition for loop.
-// }
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
}
- finally {
- loc.release();
- if (swapLsnr != null) {
- cctx.swap().removeOffHeapListener(part, swapLsnr);
- cctx.swap().removeSwapListener(part, swapLsnr);
- }
+ // Mark as last supply message.
+ s.last(part);
+
+ if (ack) {
+ s.markAck();
+
+ break; // Partition for loop.
}
}
+ finally {
+ loc.release();
- reply(node, d, s);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+ }
}
+
+ reply(node, d, s);
+
+ doneMap.put(scId, true);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition supply message to node: " + id, e);
}
+ }
- /**
- * @param n Node.
- * @param d Demand message.
- * @param s Supply message.
- * @return {@code True} if message was sent, {@code false} if recipient left grid.
- * @throws IgniteCheckedException If failed.
- */
- private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
- throws IgniteCheckedException {
- try {
- if (log.isDebugEnabled())
- log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+ /**
+ * @param n Node.
+ * @param d Demand message.
+ * @param s Supply message.
+ * @return {@code True} if message was sent, {@code false} if recipient left grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+ throws IgniteCheckedException {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
- cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+ cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
- return true;
- }
- catch (ClusterTopologyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Failed to send partition supply message because node left grid: " + n.id());
+ return true;
+ }
+ catch (ClusterTopologyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partition supply message because node left grid: " + n.id());
- return false;
- }
+ return false;
}
}
+
/**
* Demand message wrapper.
*/
@@ -542,4 +507,42 @@ class GridDhtPartitionSupplyPool {
return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
}
}
+
+
+ /**
+ * @param t T.
+ * @param phase Phase.
+ * @param partIt Partition it.
+ * @param entryIt Entry it.
+ * @param swapLsnr Swap listener.
+ */
+ private void saveSupplyContext(
+ T2 t,
+ int phase,
+ Iterator<Integer> partIt,
+ int part,
+ Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){
+ scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
+ }
+
+ private static class SupplyContext{
+ private int phase;
+
+ private Iterator<Integer> partIt;
+
+ private Iterator<?> entryIt;
+
+ private GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+ int part;
+
+ public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
+ GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
+ this.phase = phase;
+ this.partIt = partIt;
+ this.entryIt = entryIt;
+ this.swapLsnr = swapLsnr;
+ this.part = part;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ed77e37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
index e90b7af..11ea8f6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -20,12 +20,13 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.junits.common.*;
+import java.util.concurrent.atomic.*;
+
/**
*
*/
@@ -38,6 +39,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
/** cache name. */
protected static String CACHE_NAME_DHT = "cache";
+ /** {@inheritDoc} */
@Override protected long getTestTimeout() {
return Long.MAX_VALUE;
}
@@ -51,10 +53,14 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+ if (getTestGridName(3).equals(gridName))
+ iCfg.setClientMode(true);
+
cacheCfg.setName(CACHE_NAME_DHT);
cacheCfg.setCacheMode(CacheMode.PARTITIONED);
- //cacheCfg.setRebalanceBatchSize(1024);
+ cacheCfg.setRebalanceBatchSize(100 * 1024);
cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheCfg.setRebalanceThreadPoolSize(4);
//cacheCfg.setRebalanceTimeout(1000000);
cacheCfg.setBackups(1);
@@ -63,11 +69,9 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
}
/**
- * @throws Exception
+ * @param ignite Ignite.
*/
- public void testMassiveRebalancing() throws Exception {
- Ignite ignite = startGrid(0);
-
+ private void generateData(Ignite ignite) {
try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
for (int i = 0; i < TEST_SIZE; i++) {
if (i % 1_000_000 == 0)
@@ -76,12 +80,34 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
stmr.addData(i, i);
}
}
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @throws IgniteCheckedException
+ */
+ private void checkData(Ignite ignite) throws IgniteCheckedException {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Checked " + i / 1_000_000 + "m entries.");
+
+ assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testMassiveRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite);
log.info("Preloading started.");
long start = System.currentTimeMillis();
- // startGrid(1);
+ //startGrid(1);
startGrid(2);
@@ -89,19 +115,78 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
stopGrid(0);
- // Thread.sleep(10000);
+ //Thread.sleep(20000);
- // stopGrid(1);
+ //stopGrid(1);
- for (int i = 0; i < TEST_SIZE; i++) {
- if (i % 1_000_000 == 0)
- log.info("Checked " + i / 1_000_000 + "m entries.");
-
- assert grid(2).cachex(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
- }
+ checkData(grid(2));
log.info("Spend " + spend + " seconds to preload entries.");
stopAllGrids();
}
+
+ /**
+ * @throws Exception
+ */
+ public void testOpPerSecRebalancingTest() throws Exception {
+ startGrid(0);
+
+ final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+ generateData(grid(0));
+
+ startGrid(1);
+ startGrid(2);
+ startGrid(3);
+
+ Thread t = new Thread(new Runnable() {
+ @Override public void run() {
+
+ long spend = 0;
+
+ long ops = 0;
+
+ while (!cancelled.get()) {
+ try {
+ long start = System.currentTimeMillis();
+
+ int size = 1000;
+
+ for (int i = 0; i < size; i++)
+ grid(3).cachex(CACHE_NAME_DHT).remove(i);
+
+ for (int i = 0; i < size; i++)
+ grid(3).cachex(CACHE_NAME_DHT).put(i, i);
+
+ spend += System.currentTimeMillis() - start;
+
+ ops += size * 2;
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace();
+ }
+
+ log.info("Ops. per ms: " + ops / spend);
+ }
+ }
+ });
+ t.start();
+
+ stopGrid(0);
+ startGrid(0);
+
+ stopGrid(0);
+ startGrid(0);
+
+ stopGrid(0);
+ startGrid(0);
+
+ cancelled.set(true);
+ t.join();
+
+ checkData(grid(3));
+
+ //stopAllGrids();
+ }
}
\ No newline at end of file