You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/11/22 15:45:37 UTC
ignite git commit: IGNITE-4242 ExchangeManager should wait for cache
rebalancing in async way
Repository: ignite
Updated Branches:
refs/heads/ignite-4242 2d54d274a -> 5683a1552
IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5683a155
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5683a155
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5683a155
Branch: refs/heads/ignite-4242
Commit: 5683a15529001a6e32a263fbb0c8689e0f501d7a
Parents: 2d54d27
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 22 18:45:25 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 22 18:45:25 2016 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 43 +++++++++-----------
.../dht/preloader/GridDhtPartitionDemander.java | 24 +++++------
2 files changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5683a155/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 75aeeda..25814ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1557,8 +1557,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
orderMap.get(order).add(cacheId);
}
- Set<IgniteInternalFuture<Boolean>> prevRebFuts = rebFuts;
-
rebFuts = new HashSet<>();
Runnable r = null;
@@ -1569,14 +1567,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (Integer cacheId : orderMap.get(order)) {
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
- GridDhtPreloaderAssignments assignments = assignsMap.get(cacheId);
+ GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
- Runnable cur = cacheCtx.preloader().addAssignments(assignments,
+ Runnable cur = cacheCtx.preloader().addAssignments(assigns,
forcePreload,
cnt,
r);
- if (cur != null && !assignments.isEmpty())
+ if (!assigns.isEmpty())
rebList.add(cacheCtx.name());
rebFuts.add(cacheCtx.preloader().rebalanceFuture());
@@ -1585,34 +1583,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
- if (prevRebFuts != null) // Wait for previous rebalancing (it's finished or cancelled).
- for (IgniteInternalFuture<Boolean> fut : prevRebFuts)
- fut.get();
+ boolean rebNeed = !rebList.isEmpty();
- if (r != null) {
- if (!rebList.isEmpty()) {
- Collections.reverse(rebList);
+ if (rebNeed) {
+ Collections.reverse(rebList);
- U.log(log, "Cache rebalancing scheduled: [order=" + rebList + "]");
- }
+ U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
+ }
+ else
+ U.log(log, "Skipping rebalancing (nothing scheduled) " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- if (futQ.isEmpty()) {
- U.log(log, "Rebalancing required " +
+ if (futQ.isEmpty()) {
+ if (rebNeed)
+ U.log(log, "Rebalancing started " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- r.run(); // Starts rebalancing process.
- }
- else {
+ r.run();
+ }
+ else {
+ if (rebNeed)
U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- }
- }
- else {
- U.log(log, "Skipping rebalancing (nothing scheduled) " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5683a155/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index fadec5a..390cd08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -280,21 +280,18 @@ public class GridDhtPartitionDemander {
rebalanceFut = fut;
- if (next != null)
- rebalanceFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> fut) {
- next.run();
- }
- });
-
- if (assigns.isEmpty()) {
- U.log(log, "Rebalancing is not required.");
-
+ if (assigns.isEmpty())
((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
- }
return new Runnable() {
@Override public void run() {
+ if (next != null)
+ rebalanceFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ next.run();
+ }
+ });
+
if (assigns.isEmpty()) {
rebalanceFut.onDone(true); // Starts next cache preloading (according to order).
@@ -856,8 +853,7 @@ public class GridDhtPartitionDemander {
if (isDone())
return true;
- U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
- + ", topology=" + topologyVersion() + ']');
+ U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']');
if (!cctx.kernalContext().isStopping()) {
for (UUID nodeId : remaining.keySet())
@@ -888,7 +884,7 @@ public class GridDhtPartitionDemander {
remaining.remove(nodeId);
- onDone(false); // Finishing rebalance future a non completed.
+ onDone(false); // Finishing rebalance future as non completed.
checkIsDone(); // But will finish syncFuture only when other nodes are preloaded or rebalancing cancelled.
}