You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 18:59:57 UTC
[1/4] ignite git commit: Release notes updated
Repository: ignite
Updated Branches:
refs/heads/ignite-1093-2 a5fc6f35b -> fdfa62f0f
Release notes updated
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e52367bc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e52367bc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e52367bc
Branch: refs/heads/ignite-1093-2
Commit: e52367bc0a57028d86f356101cfa46cd70e35e12
Parents: a5fc6f3
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 13:16:34 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 13:16:34 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 166 +++++++++----------
1 file changed, 79 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e52367bc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 2e54294..31e2e5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -68,11 +68,11 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
@@ -141,7 +141,7 @@ public class GridDhtPartitionDemander {
*
*/
void stop() {
- syncFut.onCancel();
+ syncFut.cancel();
lastExchangeFut = null;
@@ -222,7 +222,7 @@ public class GridDhtPartitionDemander {
if (!topologyChanged(fut.assigns.topologyVersion()))
wFut.get();
else {
- fut.onCancel();
+ fut.cancel();
return;
}
@@ -231,13 +231,13 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled()) {
log.debug("Failed to wait for " + name + " cache rebalancing future (grid is stopping): " +
"[cacheName=" + cctx.name() + ']');
- fut.onCancel();
+ fut.cancel();
return;
}
}
catch (IgniteCheckedException e) {
- fut.onCancel();
+ fut.cancel();
throw new Error("Ordered rebalancing future should never fail: " + e.getMessage(), e);
}
@@ -264,7 +264,7 @@ public class GridDhtPartitionDemander {
if (fut.isInited()) {
if (!fut.isDone())
- fut.onCancel();
+ fut.cancel();
fut = new SyncFuture(assigns, cctx, log, false);
@@ -284,7 +284,7 @@ public class GridDhtPartitionDemander {
}
if (topologyChanged(topVer)) {
- fut.onCancel();
+ fut.cancel();
return;
}
@@ -315,7 +315,7 @@ public class GridDhtPartitionDemander {
if (!topologyChanged(topVer))
oFut.get();
else {
- curFut.onCancel();
+ curFut.cancel();
return;
}
@@ -325,13 +325,13 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled()) {
log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
"[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
- curFut.onCancel();
+ curFut.cancel();
return;
}
}
catch (IgniteCheckedException e) {
- curFut.onCancel();
+ curFut.cancel();
throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
}
@@ -379,7 +379,7 @@ public class GridDhtPartitionDemander {
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
if (topologyChanged(topVer)) {
- fut.onCancel();
+ fut.cancel();
return;
}
@@ -393,10 +393,6 @@ public class GridDhtPartitionDemander {
final CacheConfiguration cfg = cctx.config();
- final long start = U.currentTimeMillis();
-
- fut.logStart(node.id(), start);
-
U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
@@ -406,7 +402,7 @@ public class GridDhtPartitionDemander {
remainings.addAll(d.partitions());
- fut.append(node.id(), remainings);
+ fut.appendPartitions(node.id(), remainings);
int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
@@ -434,10 +430,10 @@ public class GridDhtPartitionDemander {
if (!topologyChanged(topVer))
cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
else
- fut.onCancel();
+ fut.cancel();
}
catch (IgniteCheckedException ex) {
- fut.onCancel();
+ fut.cancel();
U.error(log, "Failed to send partition demand message to node", ex);
}
@@ -450,7 +446,7 @@ public class GridDhtPartitionDemander {
else {
DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
- fut.append(node.id(), d.partitions());
+ fut.appendPartitions(node.id(), d.partitions());
dw.run(node, d);
}
@@ -522,7 +518,7 @@ public class GridDhtPartitionDemander {
return;
if (topologyChanged(topVer)) {
- fut.onCancel();
+ fut.cancel();
return;
}
@@ -539,7 +535,7 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled())
log.debug("Class got undeployed during preloading: " + supply.classError());
- fut.onCancel(id);
+ fut.cancel(id);
return;
}
@@ -550,7 +546,7 @@ public class GridDhtPartitionDemander {
// Preload.
for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
if (topologyChanged(topVer)) {
- fut.onCancel();
+ fut.cancel();
return;
}
@@ -597,7 +593,7 @@ public class GridDhtPartitionDemander {
if (last) {
top.own(part);
- fut.onPartitionDone(id, p);
+ fut.partitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Finished rebalancing partition: " + part);
@@ -609,14 +605,14 @@ public class GridDhtPartitionDemander {
}
}
else {
- fut.onPartitionDone(id, p);
+ fut.partitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
}
}
else {
- fut.onPartitionDone(id, p);
+ fut.partitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
@@ -626,10 +622,10 @@ public class GridDhtPartitionDemander {
// Only request partitions based on latest topology version.
for (Integer miss : supply.missed())
if (cctx.affinity().localNode(miss, topVer))
- fut.onMissedPartition(id, miss);
+ fut.partitionMissed(id, miss);
for (Integer miss : supply.missed())
- fut.onPartitionDone(id, miss);
+ fut.partitionDone(id, miss);
if (!fut.isDone()) {
GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
@@ -647,7 +643,7 @@ public class GridDhtPartitionDemander {
nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
}
else
- fut.onCancel();
+ fut.cancel();
}
}
}
@@ -655,13 +651,13 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled())
log.debug("Node left during rebalancing [node=" + node.id() +
", msg=" + e.getMessage() + ']');
- fut.onCancel();
+ fut.cancel();
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to receive partitions from node (rebalancing will not " +
"fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex);
- fut.onCancel(node.id());
+ fut.cancel(node.id());
}
}
@@ -767,6 +763,9 @@ public class GridDhtPartitionDemander {
/** */
private static final long serialVersionUID = 1L;
+ /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
+ private final boolean sendStopEvnt;
+
/** */
private final GridCacheContext<?, ?> cctx;
@@ -774,13 +773,10 @@ public class GridDhtPartitionDemander {
private final IgniteLogger log;
/** Remaining. */
- private final ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
+ private final Map<UUID, IgniteBiTuple<Long, Collection<Integer>>> remaining = new HashMap<>();
/** Missed. */
- private final ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
-
- /** Started time. */
- private final ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>();
+ private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
/** Lock. */
private final Lock lock = new ReentrantLock();
@@ -791,12 +787,12 @@ public class GridDhtPartitionDemander {
/** Completed. */
private volatile boolean completed = true;
- private final boolean sendStopEvnt;
-
/**
* @param assigns Assigns.
*/
- SyncFuture(GridDhtPreloaderAssignments assigns, GridCacheContext<?, ?> cctx, IgniteLogger log,
+ SyncFuture(GridDhtPreloaderAssignments assigns,
+ GridCacheContext<?, ?> cctx,
+ IgniteLogger log,
boolean sentStopEvnt) {
this.assigns = assigns;
this.cctx = cctx;
@@ -814,13 +810,13 @@ public class GridDhtPartitionDemander {
/**
* @param assigns Assigns.
*/
- void init(GridDhtPreloaderAssignments assigns) {
+ private void init(GridDhtPreloaderAssignments assigns) {
this.assigns = assigns;
cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
new CI1<IgniteInternalFuture<Long>>() {
@Override public void apply(IgniteInternalFuture<Long> future) {
- SyncFuture.this.onCancel();
+ SyncFuture.this.cancel();
}
});
}
@@ -828,7 +824,7 @@ public class GridDhtPartitionDemander {
/**
* @return Initialised or not.
*/
- boolean isInited() {
+ private boolean isInited() {
return assigns != null;
}
@@ -836,24 +832,21 @@ public class GridDhtPartitionDemander {
* @param nodeId Node id.
* @param parts Parts.
*/
- void append(UUID nodeId, Collection<Integer> parts) {
- remaining.put(nodeId, parts);
-
- missed.put(nodeId, new GridConcurrentHashSet<Integer>());
- }
+ private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
+ lock.lock();
- /**
- * @param nodeId Node id.
- * @param time Time.
- */
- void logStart(UUID nodeId, long time) {
- started.put(nodeId, time);
+ try {
+ remaining.put(nodeId, new IgniteBiTuple<>(System.currentTimeMillis(), parts));
+ }
+ finally {
+ lock.unlock();
+ }
}
/**
* @param node Node.
*/
- GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
+ private GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
if (isDone())
return null;
@@ -861,47 +854,51 @@ public class GridDhtPartitionDemander {
}
/**
+ * Cancels this future.
*
+ * @return {@code true}.
*/
- void onCancel() {
+ @Override public boolean cancel() {
lock.lock();
+
try {
if (isDone())
- return;
+ return true;
remaining.clear();
completed = false;
- U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing from all nodes [cache=" + cctx.name()
- + ", topology=" + topologyVersion() +
- ", time=" +
- (started.isEmpty() ? 0 : (U.currentTimeMillis() - Collections.min(started.values()))) + " ms]");
+ U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
+ + ", topology=" + topologyVersion());
checkIsDone();
}
finally {
lock.unlock();
}
+
+ return true;
}
/**
* @param nodeId Node id.
*/
- void onCancel(UUID nodeId) {
+ private void cancel(UUID nodeId) {
lock.lock();
+
try {
if (isDone())
return;
+ U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+ ", from node=" + nodeId + ", topology=" + topologyVersion() +
+ ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+
remaining.remove(nodeId);
completed = false;
- U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
- ", from node=" + nodeId + ", topology=" + topologyVersion() +
- ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
-
checkIsDone();
}
finally {
@@ -911,18 +908,12 @@ public class GridDhtPartitionDemander {
}
/**
- * @return Is completed.
- */
- boolean isCompleted() {
- return completed;
- }
-
- /**
* @param nodeId Node id.
* @param p P.
*/
- void onMissedPartition(UUID nodeId, int p) {
+ private void partitionMissed(UUID nodeId, int p) {
lock.lock();
+
try {
if (isDone())
return;
@@ -941,8 +932,9 @@ public class GridDhtPartitionDemander {
* @param nodeId Node id.
* @param p P.
*/
- void onPartitionDone(UUID nodeId, int p) {
+ private void partitionDone(UUID nodeId, int p) {
lock.lock();
+
try {
if (isDone())
return;
@@ -951,17 +943,17 @@ public class GridDhtPartitionDemander {
preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
assigns.exchangeFuture().discoveryEvent());
- Collection<Integer> parts = remaining.get(nodeId);
+ Collection<Integer> parts = remaining.get(nodeId).get2();
if (parts != null) {
parts.remove(p);
if (parts.isEmpty()) {
- remaining.remove(nodeId);
-
U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
", from node=" + nodeId + ", topology=" + topologyVersion() +
- ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+ ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+
+ remaining.remove(nodeId);
}
}
@@ -994,7 +986,7 @@ public class GridDhtPartitionDemander {
/**
*
*/
- public void checkIsDone() {
+ private void checkIsDone() {
if (remaining.isEmpty()) {
if (log.isDebugEnabled())
log.debug("Completed sync future.");
@@ -1301,7 +1293,7 @@ public class GridDhtPartitionDemander {
// then we take ownership.
if (last) {
remaining.remove(p);
- fut.onPartitionDone(node.id(), p);
+ fut.partitionDone(node.id(), p);
top.own(part);
@@ -1320,7 +1312,7 @@ public class GridDhtPartitionDemander {
}
else {
remaining.remove(p);
- fut.onPartitionDone(node.id(), p);
+ fut.partitionDone(node.id(), p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
@@ -1328,7 +1320,7 @@ public class GridDhtPartitionDemander {
}
else {
remaining.remove(p);
- fut.onPartitionDone(node.id(), p);
+ fut.partitionDone(node.id(), p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
@@ -1342,7 +1334,7 @@ public class GridDhtPartitionDemander {
if (cctx.affinity().localNode(miss, topVer))
missed.add(miss);
- fut.onMissedPartition(node.id(), miss);
+ fut.partitionMissed(node.id(), miss);
}
if (remaining.isEmpty())
@@ -1379,7 +1371,7 @@ public class GridDhtPartitionDemander {
Collection<Integer> missed = new HashSet<>();
if (topologyChanged(topVer)) {
- fut.onCancel();
+ fut.cancel();
return;
}
@@ -1400,16 +1392,16 @@ public class GridDhtPartitionDemander {
log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
", msg=" + e.getMessage() + ']');
- fut.onCancel();
+ fut.cancel();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to receive partitions from node (rebalancing will not " +
"fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
- fut.onCancel(node.id());
+ fut.cancel(node.id());
}
catch (InterruptedException e) {
- fut.onCancel();
+ fut.cancel();
}
}
finally {
[2/4] ignite git commit: 1093
Posted by sb...@apache.org.
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ce1dd0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ce1dd0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ce1dd0a
Branch: refs/heads/ignite-1093-2
Commit: 5ce1dd0aca5d485f91dcd6f47798b46424436cce
Parents: e52367b
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 16:49:12 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 16:49:12 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 192 ++++++++++---------
1 file changed, 102 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ce1dd0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 31e2e5e..9960435 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -124,7 +123,7 @@ public class GridDhtPartitionDemander {
boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
- syncFut = new SyncFuture(null, cctx, log, true);
+ syncFut = new SyncFuture();//Dummy.
if (!enabled)
// Calling onDone() immediately since preloading is disabled.
@@ -132,13 +131,13 @@ public class GridDhtPartitionDemander {
}
/**
- *
+ * Start.
*/
void start() {
}
/**
- *
+ * Stop.
*/
void stop() {
syncFut.cancel();
@@ -209,7 +208,7 @@ public class GridDhtPartitionDemander {
}
/**
- * @param name Name.
+ * @param name Cache name.
* @param fut Future.
*/
private void waitForCacheRebalancing(String name, SyncFuture fut) {
@@ -223,8 +222,6 @@ public class GridDhtPartitionDemander {
wFut.get();
else {
fut.cancel();
-
- return;
}
}
catch (IgniteInterruptedCheckedException ignored) {
@@ -232,8 +229,6 @@ public class GridDhtPartitionDemander {
log.debug("Failed to wait for " + name + " cache rebalancing future (grid is stopping): " +
"[cacheName=" + cctx.name() + ']');
fut.cancel();
-
- return;
}
}
catch (IgniteCheckedException e) {
@@ -246,7 +241,7 @@ public class GridDhtPartitionDemander {
/**
* @param assigns Assignments.
* @param force {@code True} if dummy reassign.
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException Exception
*/
void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
@@ -258,46 +253,37 @@ public class GridDhtPartitionDemander {
if (delay == 0 || force) {
assert assigns != null;
- final AffinityTopologyVersion topVer = assigns.topologyVersion();
+ final SyncFuture oldFut = syncFut;
- SyncFuture fut = syncFut;
-
- if (fut.isInited()) {
- if (!fut.isDone())
- fut.cancel();
+ final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy());
- fut = new SyncFuture(assigns, cctx, log, false);
+ syncFut = fut;
- syncFut = fut;
- }
+ if (!oldFut.isDummy())
+ oldFut.cancel();
else
- fut.init(assigns);
-
- if (assigns.isEmpty()) {
- fut.checkIsDone();
-
- if (fut.assigns.topologyVersion().topologyVersion() > 1)// Not a First node.
- U.log(log, "Rebalancing is not required [cache=" + cctx.name() +
- ", topology=" + fut.assigns.topologyVersion() + "]");
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> future) {
+ oldFut.onDone();
+ }
+ });
+ if (fut.doneIfEmpty())// Done in case empty assigns.
return;
- }
- if (topologyChanged(topVer)) {
+ if (topologyChanged(fut.topologyVersion())) {
fut.cancel();
return;
}
- final SyncFuture curFut = fut;
-
IgniteThread thread = new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() {
@Override public void run() {
if (!CU.isMarshallerCache(cctx.name())) {
- waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, curFut);
+ waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, fut);
if (!CU.isUtilityCache(cctx.name())) {
- waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, curFut);
+ waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, fut);
}
}
@@ -312,10 +298,10 @@ public class GridDhtPartitionDemander {
log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
", rebalanceOrder=" + rebalanceOrder + ']');
- if (!topologyChanged(topVer))
+ if (!topologyChanged(fut.topologyVersion()))
oFut.get();
else {
- curFut.cancel();
+ fut.cancel();
return;
}
@@ -325,19 +311,19 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled()) {
log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
"[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
- curFut.cancel();
+ fut.cancel();
return;
}
}
catch (IgniteCheckedException e) {
- curFut.cancel();
+ fut.cancel();
throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
}
}
- requestPartitions(curFut);
+ requestPartitions(fut);
}
});
@@ -394,15 +380,12 @@ public class GridDhtPartitionDemander {
final CacheConfiguration cfg = cctx.config();
U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
- ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+ ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+ ", topology=" + d.topologyVersion() + "]");
//Check remote node rebalancing API version.
if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
- GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
-
- remainings.addAll(d.partitions());
-
- fut.appendPartitions(node.id(), remainings);
+ fut.appendPartitions(node.id(), d.partitions());
int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
@@ -627,24 +610,22 @@ public class GridDhtPartitionDemander {
for (Integer miss : supply.missed())
fut.partitionDone(id, miss);
- if (!fut.isDone()) {
- GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
+ GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
- if (d != null) {
- // Create copy.
- GridDhtPartitionDemandMessage nextD =
- new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+ if (d != null) {
+ // Create copy.
+ GridDhtPartitionDemandMessage nextD =
+ new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
- nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+ nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
- if (!topologyChanged(topVer)) {
- // Send demand message.
- cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
- nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
- }
- else
- fut.cancel();
+ if (!topologyChanged(topVer)) {
+ // Send demand message.
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+ nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
}
+ else
+ fut.cancel();
}
}
catch (ClusterTopologyCheckedException e) {
@@ -759,12 +740,12 @@ public class GridDhtPartitionDemander {
/**
*
*/
- public static class SyncFuture extends GridFutureAdapter<Boolean> {
+ public static class SyncFuture extends GridFutureAdapter<Object> {
/** */
private static final long serialVersionUID = 1L;
/** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
- private final boolean sendStopEvnt;
+ private final boolean sendStoppedEvnt;
/** */
private final GridCacheContext<?, ?> cctx;
@@ -782,13 +763,13 @@ public class GridDhtPartitionDemander {
private final Lock lock = new ReentrantLock();
/** Assignments. */
- private volatile GridDhtPreloaderAssignments assigns;
-
- /** Completed. */
- private volatile boolean completed = true;
+ private final GridDhtPreloaderAssignments assigns;
/**
* @param assigns Assigns.
+ * @param cctx Context.
+ * @param log Logger.
+ * @param sentStopEvnt Stop event flag.
*/
SyncFuture(GridDhtPreloaderAssignments assigns,
GridCacheContext<?, ?> cctx,
@@ -797,35 +778,39 @@ public class GridDhtPartitionDemander {
this.assigns = assigns;
this.cctx = cctx;
this.log = log;
- this.sendStopEvnt = sentStopEvnt;
+ this.sendStoppedEvnt = sentStopEvnt;
+
+ if (assigns != null)
+ cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
+ new CI1<IgniteInternalFuture<Long>>() {
+ @Override public void apply(IgniteInternalFuture<Long> future) {
+ SyncFuture.this.cancel();
+ }
+ });
}
/**
- * @return Topology version.
+ * Dummy future. Will be done by real one.
*/
- public AffinityTopologyVersion topologyVersion() {
- return assigns != null ? assigns.topologyVersion() : null;
+ public SyncFuture() {
+ this.assigns = null;
+ this.cctx = null;
+ this.log = null;
+ this.sendStoppedEvnt = false;
}
/**
- * @param assigns Assigns.
+ * @return Topology version.
*/
- private void init(GridDhtPreloaderAssignments assigns) {
- this.assigns = assigns;
-
- cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
- new CI1<IgniteInternalFuture<Long>>() {
- @Override public void apply(IgniteInternalFuture<Long> future) {
- SyncFuture.this.cancel();
- }
- });
+ public AffinityTopologyVersion topologyVersion() {
+ return assigns != null ? assigns.topologyVersion() : null;
}
/**
- * @return Initialised or not.
+ * @return Is dummy (created at demander creation).
*/
- private boolean isInited() {
- return assigns != null;
+ private boolean isDummy() {
+ return assigns == null;
}
/**
@@ -845,6 +830,7 @@ public class GridDhtPartitionDemander {
/**
* @param node Node.
+ * @return Demand message.
*/
private GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
if (isDone())
@@ -854,6 +840,36 @@ public class GridDhtPartitionDemander {
}
/**
+ * @return future is done.
+ */
+ private boolean doneIfEmpty() {
+ lock.lock();
+
+ try {
+ if (isDone())
+ return true;
+
+ if (assigns.isEmpty()) {
+ assert remaining.isEmpty();
+
+ if (assigns.topologyVersion().topologyVersion() > 1)// Not an initial topology.
+ U.log(log, "Rebalancing is not required [cache=" + cctx.name() +
+ ", topology=" + assigns.topologyVersion() + "]");
+
+ checkIsDone();
+
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
* Cancels this future.
*
* @return {@code true}.
@@ -867,8 +883,6 @@ public class GridDhtPartitionDemander {
remaining.clear();
- completed = false;
-
U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
+ ", topology=" + topologyVersion());
@@ -892,13 +906,11 @@ public class GridDhtPartitionDemander {
return;
U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
- ", from node=" + nodeId + ", topology=" + topologyVersion() +
+ ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
remaining.remove(nodeId);
- completed = false;
-
checkIsDone();
}
finally {
@@ -919,7 +931,7 @@ public class GridDhtPartitionDemander {
return;
if (missed.get(nodeId) == null)
- missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+ missed.put(nodeId, new HashSet<Integer>());
missed.get(nodeId).add(p);
}
@@ -950,7 +962,7 @@ public class GridDhtPartitionDemander {
if (parts.isEmpty()) {
U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
- ", from node=" + nodeId + ", topology=" + topologyVersion() +
+ ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
remaining.remove(nodeId);
@@ -1007,10 +1019,10 @@ public class GridDhtPartitionDemander {
cctx.shared().exchange().scheduleResendPartitions();
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStopEvnt))
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
- onDone(completed);
+ onDone();
}
}
}
[3/4] ignite git commit: 1093
Posted by sb...@apache.org.
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c45d2af4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c45d2af4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c45d2af4
Branch: refs/heads/ignite-1093-2
Commit: c45d2af46e2a80f1c82724807a834f1afecc2be7
Parents: 5ce1dd0
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 18:24:02 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 18:24:02 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 2 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 2 +-
.../GridCacheRebalancingSyncSelfTest.java | 67 ++++++++++++++++----
3 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c45d2af4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 9960435..87a1a6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -193,7 +193,7 @@ public class GridDhtPartitionDemander {
* @return {@code True} if topology changed.
*/
private boolean topologyChanged(AffinityTopologyVersion topVer) {
- return cctx.affinity().affinityTopologyVersion().topologyVersion() != topVer.topologyVersion();
+ return !cctx.affinity().affinityTopologyVersion().equals(topVer);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c45d2af4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 49e89ca..0686376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -104,7 +104,7 @@ class GridDhtPartitionSupplier {
assert d != null;
assert id != null;
- if (cctx.affinity().affinityTopologyVersion().topologyVersion() != d.topologyVersion().topologyVersion())
+ if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
return;
GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/c45d2af4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 07c3e7c..db0c8ba 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -117,6 +118,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
return iCfg;
}
+ /**
+ * @param ignite Ignite.
+ */
protected void generateData(Ignite ignite) {
generateData(ignite, CACHE_NAME_DHT_PARTITIONED);
generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
@@ -140,6 +144,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
}
+ /**
+ * @param ignite Ignite.
+ * @throws IgniteCheckedException Exception.
+ */
protected void checkData(Ignite ignite) throws IgniteCheckedException {
checkData(ignite, CACHE_NAME_DHT_PARTITIONED);
checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
@@ -149,7 +157,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
* @param ignite Ignite.
- * @throws IgniteCheckedException
+ * @param name Cache name.
+ * @throws IgniteCheckedException Exception.
*/
protected void checkData(Ignite ignite, String name) throws IgniteCheckedException {
for (int i = 0; i < TEST_SIZE; i++) {
@@ -162,7 +171,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception Exception
*/
public void testSimpleRebalancing() throws Exception {
Ignite ignite = startGrid(0);
@@ -189,10 +198,30 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
/**
- * @param id Id.
- * @param top Topology.
+ * @param id Node id.
+ * @param major Major ver.
+ * @param minor Minor ver.
+ * @throws IgniteCheckedException Exception.
+ */
+ protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException {
+ waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
+ }
+
+ /**
+ * @param id Node id.
+ * @param major Major ver.
+ * @throws IgniteCheckedException Exception.
*/
- protected void waitForRebalancing(int id, int top) throws IgniteCheckedException {
+ protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
+ waitForRebalancing(id, new AffinityTopologyVersion(major));
+ }
+
+ /**
+ * @param id Node id.
+ * @param top Topology version.
+ * @throws IgniteCheckedException
+ */
+ protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException {
boolean finished = false;
while (!finished) {
@@ -200,7 +229,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
GridDhtPartitionDemander.SyncFuture fut = (GridDhtPartitionDemander.SyncFuture)c.preloader().syncFuture();
- if (fut.topologyVersion().topologyVersion() != top) {
+ if (fut.topologyVersion() == null || !fut.topologyVersion().equals(top)) {
finished = false;
break;
@@ -229,6 +258,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
startGrid(1);
startGrid(2);
+ while (!concurrentStartFinished2) {
+ U.sleep(10);
+ }
+
+ //New cache should start rebalancing.
+ CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+ cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW");
+ cacheRCfg.setCacheMode(CacheMode.PARTITIONED);
+ cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+ grid(0).getOrCreateCache(cacheRCfg);
+
concurrentStartFinished = true;
}
catch (Exception e) {
@@ -256,10 +298,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
//wait until cache rebalanced in async mode
- waitForRebalancing(1, 5);
- waitForRebalancing(2, 5);
- waitForRebalancing(3, 5);
- waitForRebalancing(4, 5);
+
+ waitForRebalancing(1, 5, 1);
+ waitForRebalancing(2, 5, 1);
+ waitForRebalancing(3, 5, 1);
+ waitForRebalancing(4, 5, 1);
//cache rebalanced in async node
@@ -302,7 +345,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception Exception.
*/
public void testBackwardCompatibility() throws Exception {
Ignite ignite = startGrid(0);
@@ -327,7 +370,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception Exception.
*/
public void testNodeFailedAtRebalancing() throws Exception {
Ignite ignite = startGrid(0);
[4/4] ignite git commit: 1093
Posted by sb...@apache.org.
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fdfa62f0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fdfa62f0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fdfa62f0
Branch: refs/heads/ignite-1093-2
Commit: fdfa62f0ff67c3d8266f862e0a0b53e065a96f91
Parents: c45d2af
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 18:40:07 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 18:40:07 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/preloader/GridDhtPartitionDemander.java | 8 +-------
1 file changed, 1 insertion(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fdfa62f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 87a1a6b..aa7d90b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -497,15 +497,9 @@ public class GridDhtPartitionDemander {
final SyncFuture fut = syncFut;
- if (!fut.topologyVersion().equals(topVer))
+ if (!fut.topologyVersion().equals(topVer))//will check topology changed at loop.
return;
- if (topologyChanged(topVer)) {
- fut.cancel();
-
- return;
- }
-
ClusterNode node = cctx.node(id);
assert node != null;