You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/11/21 20:27:03 UTC

[1/2] ignite git commit: IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way

Repository: ignite
Updated Branches:
  refs/heads/ignite-4242 787236a15 -> 2d54d274a


IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00861223
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00861223
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00861223

Branch: refs/heads/ignite-4242
Commit: 00861223101147dd3ecb60bec552bb06e226fd98
Parents: 787236a
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Nov 21 22:20:04 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Nov 21 22:20:04 2016 +0300

----------------------------------------------------------------------
 .../distributed/dht/preloader/GridDhtPartitionDemander.java     | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/00861223/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 999f99c..41b7bc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -292,6 +292,11 @@ public class GridDhtPartitionDemander {
                     try {
                         requestPartitions(fut, assigns);
                     }
+                    catch (ClusterTopologyCheckedException e){
+                        log.warning("Failed to send initial demand request to node.", e);
+
+                        fut.cancel();
+                    }
                     catch (IgniteCheckedException e) {
                         log.error("Failed to send initial demand request to node.", e);
 


[2/2] ignite git commit: IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way

Posted by av...@apache.org.
IGNITE-4242 ExchangeManager should wait for cache rebalancing in async way


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d54d274
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d54d274
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d54d274

Branch: refs/heads/ignite-4242
Commit: 2d54d274aa804d8e445887fbe8875ed3bc0d4eab
Parents: 0086122
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Nov 21 23:26:49 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Nov 21 23:26:49 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 12 +++--
 .../dht/preloader/GridDhtPartitionDemander.java | 51 ++++++++------------
 2 files changed, 27 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2d54d274/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index dd0256a..75aeeda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1569,12 +1569,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             for (Integer cacheId : orderMap.get(order)) {
                                 GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
-                                Runnable cur = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
+                                GridDhtPreloaderAssignments assignments = assignsMap.get(cacheId);
+
+                                Runnable cur = cacheCtx.preloader().addAssignments(assignments,
                                     forcePreload,
                                     cnt,
                                     r);
 
-                                if (cur != null)
+                                if (cur != null && !assignments.isEmpty())
                                     rebList.add(cacheCtx.name());
 
                                 rebFuts.add(cacheCtx.preloader().rebalanceFuture());
@@ -1588,9 +1590,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 fut.get();
 
                         if (r != null) {
-                            Collections.reverse(rebList);
+                            if (!rebList.isEmpty()) {
+                                Collections.reverse(rebList);
 
-                            U.log(log, "Cache rebalancing scheduled: [order=" + rebList + "]");
+                                U.log(log, "Cache rebalancing scheduled: [order=" + rebList + "]");
+                            }
 
                             if (futQ.isEmpty()) {
                                 U.log(log, "Rebalancing required " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/2d54d274/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 41b7bc1..fadec5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -287,25 +287,37 @@ public class GridDhtPartitionDemander {
                     }
                 });
 
+            if (assigns.isEmpty()) {
+                U.log(log, "Rebalancing is not required.");
+
+                ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+            }
+
             return new Runnable() {
                 @Override public void run() {
+                    if (assigns.isEmpty()) {
+                        rebalanceFut.onDone(true); // Starts next cache preloading (according to order).
+
+                        return;
+                    }
+
                     try {
-                        requestPartitions(fut, assigns);
+                        requestPartitions(rebalanceFut, assigns);
                     }
-                    catch (ClusterTopologyCheckedException e){
+                    catch (ClusterTopologyCheckedException e) {
                         log.warning("Failed to send initial demand request to node.", e);
 
-                        fut.cancel();
+                        rebalanceFut.cancel();
                     }
                     catch (IgniteCheckedException e) {
                         log.error("Failed to send initial demand request to node.", e);
 
-                        fut.cancel();
+                        rebalanceFut.cancel();
                     }
                     catch (Throwable th) {
                         log.error("Runtime error caught during initial demand request sending.", th);
 
-                        fut.cancel();
+                        rebalanceFut.cancel();
 
                         if (th instanceof Error)
                             throw th;
@@ -351,12 +363,6 @@ public class GridDhtPartitionDemander {
         RebalanceFuture fut,
         GridDhtPreloaderAssignments assigns
     ) throws IgniteCheckedException {
-        if (assigns.isEmpty()) {
-            fut.doneIfEmpty(assigns.cancelled());
-
-            return;
-        }
-
         if (topologyChanged(fut)) {
             fut.cancel();
 
@@ -382,7 +388,7 @@ public class GridDhtPartitionDemander {
 
             //Check remote node rebalancing API version.
             if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
-                U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
                     ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
                     ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
 
@@ -841,24 +847,6 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         * @param cancelled Is cancelled.
-         */
-        private void doneIfEmpty(boolean cancelled) {
-            synchronized (this) {
-                if (isDone())
-                    return;
-
-                assert remaining.isEmpty();
-
-                if (log.isDebugEnabled())
-                    log.debug("Rebalancing is not required [cache=" + cctx.name() +
-                        ", topology=" + topVer + "]");
-
-                checkIsDone(cancelled, true);
-            }
-        }
-
-        /**
          * Cancels this future.
          *
          * @return {@code True}.
@@ -981,8 +969,7 @@ public class GridDhtPartitionDemander {
 
                 if (parts.isEmpty()) {
                     U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
-                        "rebalancing [cache=" + cctx.name() +
-                        ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+                        "rebalancing [fromNode=" + nodeId + ", topology=" + topologyVersion() +
                         ", time=" + (U.currentTimeMillis() - t.get1()) + " ms]"));
 
                     remaining.remove(nodeId);