You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 18:59:58 UTC
[2/4] ignite git commit: 1093
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ce1dd0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ce1dd0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ce1dd0a
Branch: refs/heads/ignite-1093-2
Commit: 5ce1dd0aca5d485f91dcd6f47798b46424436cce
Parents: e52367b
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 16:49:12 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 16:49:12 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 192 ++++++++++---------
1 file changed, 102 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ce1dd0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 31e2e5e..9960435 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -124,7 +123,7 @@ public class GridDhtPartitionDemander {
boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
- syncFut = new SyncFuture(null, cctx, log, true);
+ syncFut = new SyncFuture();//Dummy.
if (!enabled)
// Calling onDone() immediately since preloading is disabled.
@@ -132,13 +131,13 @@ public class GridDhtPartitionDemander {
}
/**
- *
+ * Start.
*/
void start() {
}
/**
- *
+ * Stop.
*/
void stop() {
syncFut.cancel();
@@ -209,7 +208,7 @@ public class GridDhtPartitionDemander {
}
/**
- * @param name Name.
+ * @param name Cache name.
* @param fut Future.
*/
private void waitForCacheRebalancing(String name, SyncFuture fut) {
@@ -223,8 +222,6 @@ public class GridDhtPartitionDemander {
wFut.get();
else {
fut.cancel();
-
- return;
}
}
catch (IgniteInterruptedCheckedException ignored) {
@@ -232,8 +229,6 @@ public class GridDhtPartitionDemander {
log.debug("Failed to wait for " + name + " cache rebalancing future (grid is stopping): " +
"[cacheName=" + cctx.name() + ']');
fut.cancel();
-
- return;
}
}
catch (IgniteCheckedException e) {
@@ -246,7 +241,7 @@ public class GridDhtPartitionDemander {
/**
* @param assigns Assignments.
* @param force {@code True} if dummy reassign.
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException Exception
*/
void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
@@ -258,46 +253,37 @@ public class GridDhtPartitionDemander {
if (delay == 0 || force) {
assert assigns != null;
- final AffinityTopologyVersion topVer = assigns.topologyVersion();
+ final SyncFuture oldFut = syncFut;
- SyncFuture fut = syncFut;
-
- if (fut.isInited()) {
- if (!fut.isDone())
- fut.cancel();
+ final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy());
- fut = new SyncFuture(assigns, cctx, log, false);
+ syncFut = fut;
- syncFut = fut;
- }
+ if (!oldFut.isDummy())
+ oldFut.cancel();
else
- fut.init(assigns);
-
- if (assigns.isEmpty()) {
- fut.checkIsDone();
-
- if (fut.assigns.topologyVersion().topologyVersion() > 1)// Not a First node.
- U.log(log, "Rebalancing is not required [cache=" + cctx.name() +
- ", topology=" + fut.assigns.topologyVersion() + "]");
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> future) {
+ oldFut.onDone();
+ }
+ });
+ if (fut.doneIfEmpty())// Done in case empty assigns.
return;
- }
- if (topologyChanged(topVer)) {
+ if (topologyChanged(fut.topologyVersion())) {
fut.cancel();
return;
}
- final SyncFuture curFut = fut;
-
IgniteThread thread = new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() {
@Override public void run() {
if (!CU.isMarshallerCache(cctx.name())) {
- waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, curFut);
+ waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, fut);
if (!CU.isUtilityCache(cctx.name())) {
- waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, curFut);
+ waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, fut);
}
}
@@ -312,10 +298,10 @@ public class GridDhtPartitionDemander {
log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
", rebalanceOrder=" + rebalanceOrder + ']');
- if (!topologyChanged(topVer))
+ if (!topologyChanged(fut.topologyVersion()))
oFut.get();
else {
- curFut.cancel();
+ fut.cancel();
return;
}
@@ -325,19 +311,19 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled()) {
log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
"[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
- curFut.cancel();
+ fut.cancel();
return;
}
}
catch (IgniteCheckedException e) {
- curFut.cancel();
+ fut.cancel();
throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
}
}
- requestPartitions(curFut);
+ requestPartitions(fut);
}
});
@@ -394,15 +380,12 @@ public class GridDhtPartitionDemander {
final CacheConfiguration cfg = cctx.config();
U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
- ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+ ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+ ", topology=" + d.topologyVersion() + "]");
//Check remote node rebalancing API version.
if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
- GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
-
- remainings.addAll(d.partitions());
-
- fut.appendPartitions(node.id(), remainings);
+ fut.appendPartitions(node.id(), d.partitions());
int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
@@ -627,24 +610,22 @@ public class GridDhtPartitionDemander {
for (Integer miss : supply.missed())
fut.partitionDone(id, miss);
- if (!fut.isDone()) {
- GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
+ GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
- if (d != null) {
- // Create copy.
- GridDhtPartitionDemandMessage nextD =
- new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+ if (d != null) {
+ // Create copy.
+ GridDhtPartitionDemandMessage nextD =
+ new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
- nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+ nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
- if (!topologyChanged(topVer)) {
- // Send demand message.
- cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
- nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
- }
- else
- fut.cancel();
+ if (!topologyChanged(topVer)) {
+ // Send demand message.
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+ nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
}
+ else
+ fut.cancel();
}
}
catch (ClusterTopologyCheckedException e) {
@@ -759,12 +740,12 @@ public class GridDhtPartitionDemander {
/**
*
*/
- public static class SyncFuture extends GridFutureAdapter<Boolean> {
+ public static class SyncFuture extends GridFutureAdapter<Object> {
/** */
private static final long serialVersionUID = 1L;
/** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
- private final boolean sendStopEvnt;
+ private final boolean sendStoppedEvnt;
/** */
private final GridCacheContext<?, ?> cctx;
@@ -782,13 +763,13 @@ public class GridDhtPartitionDemander {
private final Lock lock = new ReentrantLock();
/** Assignments. */
- private volatile GridDhtPreloaderAssignments assigns;
-
- /** Completed. */
- private volatile boolean completed = true;
+ private final GridDhtPreloaderAssignments assigns;
/**
* @param assigns Assigns.
+ * @param cctx Context.
+ * @param log Logger.
+ * @param sentStopEvnt Stop event flag.
*/
SyncFuture(GridDhtPreloaderAssignments assigns,
GridCacheContext<?, ?> cctx,
@@ -797,35 +778,39 @@ public class GridDhtPartitionDemander {
this.assigns = assigns;
this.cctx = cctx;
this.log = log;
- this.sendStopEvnt = sentStopEvnt;
+ this.sendStoppedEvnt = sentStopEvnt;
+
+ if (assigns != null)
+ cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
+ new CI1<IgniteInternalFuture<Long>>() {
+ @Override public void apply(IgniteInternalFuture<Long> future) {
+ SyncFuture.this.cancel();
+ }
+ });
}
/**
- * @return Topology version.
+ * Dummy future. Will be done by real one.
*/
- public AffinityTopologyVersion topologyVersion() {
- return assigns != null ? assigns.topologyVersion() : null;
+ public SyncFuture() {
+ this.assigns = null;
+ this.cctx = null;
+ this.log = null;
+ this.sendStoppedEvnt = false;
}
/**
- * @param assigns Assigns.
+ * @return Topology version.
*/
- private void init(GridDhtPreloaderAssignments assigns) {
- this.assigns = assigns;
-
- cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
- new CI1<IgniteInternalFuture<Long>>() {
- @Override public void apply(IgniteInternalFuture<Long> future) {
- SyncFuture.this.cancel();
- }
- });
+ public AffinityTopologyVersion topologyVersion() {
+ return assigns != null ? assigns.topologyVersion() : null;
}
/**
- * @return Initialised or not.
+ * @return Is dummy (created at demander creation).
*/
- private boolean isInited() {
- return assigns != null;
+ private boolean isDummy() {
+ return assigns == null;
}
/**
@@ -845,6 +830,7 @@ public class GridDhtPartitionDemander {
/**
* @param node Node.
+ * @return Demand message.
*/
private GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
if (isDone())
@@ -854,6 +840,36 @@ public class GridDhtPartitionDemander {
}
/**
+ * @return future is done.
+ */
+ private boolean doneIfEmpty() {
+ lock.lock();
+
+ try {
+ if (isDone())
+ return true;
+
+ if (assigns.isEmpty()) {
+ assert remaining.isEmpty();
+
+ if (assigns.topologyVersion().topologyVersion() > 1)// Not an initial topology.
+ U.log(log, "Rebalancing is not required [cache=" + cctx.name() +
+ ", topology=" + assigns.topologyVersion() + "]");
+
+ checkIsDone();
+
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
* Cancels this future.
*
* @return {@code true}.
@@ -867,8 +883,6 @@ public class GridDhtPartitionDemander {
remaining.clear();
- completed = false;
-
U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
+ ", topology=" + topologyVersion());
@@ -892,13 +906,11 @@ public class GridDhtPartitionDemander {
return;
U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
- ", from node=" + nodeId + ", topology=" + topologyVersion() +
+ ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
remaining.remove(nodeId);
- completed = false;
-
checkIsDone();
}
finally {
@@ -919,7 +931,7 @@ public class GridDhtPartitionDemander {
return;
if (missed.get(nodeId) == null)
- missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+ missed.put(nodeId, new HashSet<Integer>());
missed.get(nodeId).add(p);
}
@@ -950,7 +962,7 @@ public class GridDhtPartitionDemander {
if (parts.isEmpty()) {
U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
- ", from node=" + nodeId + ", topology=" + topologyVersion() +
+ ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
remaining.remove(nodeId);
@@ -1007,10 +1019,10 @@ public class GridDhtPartitionDemander {
cctx.shared().exchange().scheduleResendPartitions();
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStopEvnt))
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
- onDone(completed);
+ onDone();
}
}
}