You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/11/22 15:45:37 UTC

ignite git commit: IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way

Repository: ignite
Updated Branches:
  refs/heads/ignite-4242 2d54d274a -> 5683a1552


IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5683a155
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5683a155
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5683a155

Branch: refs/heads/ignite-4242
Commit: 5683a15529001a6e32a263fbb0c8689e0f501d7a
Parents: 2d54d27
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 22 18:45:25 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 22 18:45:25 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 43 +++++++++-----------
 .../dht/preloader/GridDhtPartitionDemander.java | 24 +++++------
 2 files changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5683a155/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 75aeeda..25814ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1557,8 +1557,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             orderMap.get(order).add(cacheId);
                         }
 
-                        Set<IgniteInternalFuture<Boolean>> prevRebFuts = rebFuts;
-
                         rebFuts = new HashSet<>();
 
                         Runnable r = null;
@@ -1569,14 +1567,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             for (Integer cacheId : orderMap.get(order)) {
                                 GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
-                                GridDhtPreloaderAssignments assignments = assignsMap.get(cacheId);
+                                GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
 
-                                Runnable cur = cacheCtx.preloader().addAssignments(assignments,
+                                Runnable cur = cacheCtx.preloader().addAssignments(assigns,
                                     forcePreload,
                                     cnt,
                                     r);
 
-                                if (cur != null && !assignments.isEmpty())
+                                if (!assigns.isEmpty())
                                     rebList.add(cacheCtx.name());
 
                                 rebFuts.add(cacheCtx.preloader().rebalanceFuture());
@@ -1585,34 +1583,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             }
                         }
 
-                        if (prevRebFuts != null) // Wait for previous rebalancing (it's finished or cancelled).
-                            for (IgniteInternalFuture<Boolean> fut : prevRebFuts)
-                                fut.get();
+                        boolean rebNeed = !rebList.isEmpty();
 
-                        if (r != null) {
-                            if (!rebList.isEmpty()) {
-                                Collections.reverse(rebList);
+                        if (rebNeed) {
+                            Collections.reverse(rebList);
 
-                                U.log(log, "Cache rebalancing scheduled: [order=" + rebList + "]");
-                            }
+                            U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
+                        }
+                        else
+                            U.log(log, "Skipping rebalancing (nothing scheduled) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
 
-                            if (futQ.isEmpty()) {
-                                U.log(log, "Rebalancing required " +
+                        if (futQ.isEmpty()) {
+                            if (rebNeed)
+                                U.log(log, "Rebalancing started " +
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
                                     ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
 
-                                r.run(); // Starts rebalancing process.
-                            }
-                            else {
+                            r.run();
+                        }
+                        else {
+                            if (rebNeed)
                                 U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
                                     ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-                            }
-                        }
-                        else {
-                            U.log(log, "Skipping rebalancing (nothing scheduled) " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
-                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5683a155/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index fadec5a..390cd08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -280,21 +280,18 @@ public class GridDhtPartitionDemander {
 
             rebalanceFut = fut;
 
-            if (next != null)
-                rebalanceFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                    @Override public void apply(IgniteInternalFuture<Boolean> fut) {
-                        next.run();
-                    }
-                });
-
-            if (assigns.isEmpty()) {
-                U.log(log, "Rebalancing is not required.");
-
+            if (assigns.isEmpty())
                 ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
-            }
 
             return new Runnable() {
                 @Override public void run() {
+                    if (next != null)
+                        rebalanceFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                            @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                                next.run();
+                            }
+                        });
+
                     if (assigns.isEmpty()) {
                         rebalanceFut.onDone(true); // Starts next cache preloading (according to order).
 
@@ -856,8 +853,7 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return true;
 
-                U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
-                    + ", topology=" + topologyVersion() + ']');
+                U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']');
 
                 if (!cctx.kernalContext().isStopping()) {
                     for (UUID nodeId : remaining.keySet())
@@ -888,7 +884,7 @@ public class GridDhtPartitionDemander {
 
                 remaining.remove(nodeId);
 
-                onDone(false); // Finishing rebalance future a non completed.
+                onDone(false); // Finishing rebalance future as non completed.
 
                 checkIsDone(); // But will finish syncFuture only when other nodes are preloaded or rebalancing cancelled.
             }