You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 18:59:57 UTC

[1/4] ignite git commit: Release notes updated

Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 a5fc6f35b -> fdfa62f0f


Release notes updated


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e52367bc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e52367bc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e52367bc

Branch: refs/heads/ignite-1093-2
Commit: e52367bc0a57028d86f356101cfa46cd70e35e12
Parents: a5fc6f3
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 13:16:34 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 13:16:34 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 166 +++++++++----------
 1 file changed, 79 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e52367bc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 2e54294..31e2e5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -68,11 +68,11 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
@@ -141,7 +141,7 @@ public class GridDhtPartitionDemander {
      *
      */
     void stop() {
-        syncFut.onCancel();
+        syncFut.cancel();
 
         lastExchangeFut = null;
 
@@ -222,7 +222,7 @@ public class GridDhtPartitionDemander {
             if (!topologyChanged(fut.assigns.topologyVersion()))
                 wFut.get();
             else {
-                fut.onCancel();
+                fut.cancel();
 
                 return;
             }
@@ -231,13 +231,13 @@ public class GridDhtPartitionDemander {
             if (log.isDebugEnabled()) {
                 log.debug("Failed to wait for " + name + " cache rebalancing future (grid is stopping): " +
                     "[cacheName=" + cctx.name() + ']');
-                fut.onCancel();
+                fut.cancel();
 
                 return;
             }
         }
         catch (IgniteCheckedException e) {
-            fut.onCancel();
+            fut.cancel();
 
             throw new Error("Ordered rebalancing future should never fail: " + e.getMessage(), e);
         }
@@ -264,7 +264,7 @@ public class GridDhtPartitionDemander {
 
             if (fut.isInited()) {
                 if (!fut.isDone())
-                    fut.onCancel();
+                    fut.cancel();
 
                 fut = new SyncFuture(assigns, cctx, log, false);
 
@@ -284,7 +284,7 @@ public class GridDhtPartitionDemander {
             }
 
             if (topologyChanged(topVer)) {
-                fut.onCancel();
+                fut.cancel();
 
                 return;
             }
@@ -315,7 +315,7 @@ public class GridDhtPartitionDemander {
                                 if (!topologyChanged(topVer))
                                     oFut.get();
                                 else {
-                                    curFut.onCancel();
+                                    curFut.cancel();
 
                                     return;
                                 }
@@ -325,13 +325,13 @@ public class GridDhtPartitionDemander {
                             if (log.isDebugEnabled()) {
                                 log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
                                     "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-                                curFut.onCancel();
+                                curFut.cancel();
 
                                 return;
                             }
                         }
                         catch (IgniteCheckedException e) {
-                            curFut.onCancel();
+                            curFut.cancel();
 
                             throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
                         }
@@ -379,7 +379,7 @@ public class GridDhtPartitionDemander {
 
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
             if (topologyChanged(topVer)) {
-                fut.onCancel();
+                fut.cancel();
 
                 return;
             }
@@ -393,10 +393,6 @@ public class GridDhtPartitionDemander {
 
             final CacheConfiguration cfg = cctx.config();
 
-            final long start = U.currentTimeMillis();
-
-            fut.logStart(node.id(), start);
-
             U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
                 ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
 
@@ -406,7 +402,7 @@ public class GridDhtPartitionDemander {
 
                 remainings.addAll(d.partitions());
 
-                fut.append(node.id(), remainings);
+                fut.appendPartitions(node.id(), remainings);
 
                 int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
@@ -434,10 +430,10 @@ public class GridDhtPartitionDemander {
                             if (!topologyChanged(topVer))
                                 cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
                             else
-                                fut.onCancel();
+                                fut.cancel();
                         }
                         catch (IgniteCheckedException ex) {
-                            fut.onCancel();
+                            fut.cancel();
 
                             U.error(log, "Failed to send partition demand message to node", ex);
                         }
@@ -450,7 +446,7 @@ public class GridDhtPartitionDemander {
             else {
                 DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
 
-                fut.append(node.id(), d.partitions());
+                fut.appendPartitions(node.id(), d.partitions());
 
                 dw.run(node, d);
             }
@@ -522,7 +518,7 @@ public class GridDhtPartitionDemander {
             return;
 
         if (topologyChanged(topVer)) {
-            fut.onCancel();
+            fut.cancel();
 
             return;
         }
@@ -539,7 +535,7 @@ public class GridDhtPartitionDemander {
             if (log.isDebugEnabled())
                 log.debug("Class got undeployed during preloading: " + supply.classError());
 
-            fut.onCancel(id);
+            fut.cancel(id);
 
             return;
         }
@@ -550,7 +546,7 @@ public class GridDhtPartitionDemander {
             // Preload.
             for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
                 if (topologyChanged(topVer)) {
-                    fut.onCancel();
+                    fut.cancel();
 
                     return;
                 }
@@ -597,7 +593,7 @@ public class GridDhtPartitionDemander {
                             if (last) {
                                 top.own(part);
 
-                                fut.onPartitionDone(id, p);
+                                fut.partitionDone(id, p);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Finished rebalancing partition: " + part);
@@ -609,14 +605,14 @@ public class GridDhtPartitionDemander {
                         }
                     }
                     else {
-                        fut.onPartitionDone(id, p);
+                        fut.partitionDone(id, p);
 
                         if (log.isDebugEnabled())
                             log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
                     }
                 }
                 else {
-                    fut.onPartitionDone(id, p);
+                    fut.partitionDone(id, p);
 
                     if (log.isDebugEnabled())
                         log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
@@ -626,10 +622,10 @@ public class GridDhtPartitionDemander {
             // Only request partitions based on latest topology version.
             for (Integer miss : supply.missed())
                 if (cctx.affinity().localNode(miss, topVer))
-                    fut.onMissedPartition(id, miss);
+                    fut.partitionMissed(id, miss);
 
             for (Integer miss : supply.missed())
-                fut.onPartitionDone(id, miss);
+                fut.partitionDone(id, miss);
 
             if (!fut.isDone()) {
                 GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
@@ -647,7 +643,7 @@ public class GridDhtPartitionDemander {
                             nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
                     }
                     else
-                        fut.onCancel();
+                        fut.cancel();
                 }
             }
         }
@@ -655,13 +651,13 @@ public class GridDhtPartitionDemander {
             if (log.isDebugEnabled())
                 log.debug("Node left during rebalancing [node=" + node.id() +
                     ", msg=" + e.getMessage() + ']');
-            fut.onCancel();
+            fut.cancel();
         }
         catch (IgniteCheckedException ex) {
             U.error(log, "Failed to receive partitions from node (rebalancing will not " +
                 "fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex);
 
-            fut.onCancel(node.id());
+            fut.cancel(node.id());
         }
     }
 
@@ -767,6 +763,9 @@ public class GridDhtPartitionDemander {
         /** */
         private static final long serialVersionUID = 1L;
 
+        /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
+        private final boolean sendStopEvnt;
+
         /** */
         private final GridCacheContext<?, ?> cctx;
 
@@ -774,13 +773,10 @@ public class GridDhtPartitionDemander {
         private final IgniteLogger log;
 
         /** Remaining. */
-        private final ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
+        private final Map<UUID, IgniteBiTuple<Long, Collection<Integer>>> remaining = new HashMap<>();
 
         /** Missed. */
-        private final ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
-
-        /** Started time. */
-        private final ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>();
+        private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
 
         /** Lock. */
         private final Lock lock = new ReentrantLock();
@@ -791,12 +787,12 @@ public class GridDhtPartitionDemander {
         /** Completed. */
         private volatile boolean completed = true;
 
-        private final boolean sendStopEvnt;
-
         /**
          * @param assigns Assigns.
          */
-        SyncFuture(GridDhtPreloaderAssignments assigns, GridCacheContext<?, ?> cctx, IgniteLogger log,
+        SyncFuture(GridDhtPreloaderAssignments assigns,
+            GridCacheContext<?, ?> cctx,
+            IgniteLogger log,
             boolean sentStopEvnt) {
             this.assigns = assigns;
             this.cctx = cctx;
@@ -814,13 +810,13 @@ public class GridDhtPartitionDemander {
         /**
          * @param assigns Assigns.
          */
-        void init(GridDhtPreloaderAssignments assigns) {
+        private void init(GridDhtPreloaderAssignments assigns) {
             this.assigns = assigns;
 
             cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
                 new CI1<IgniteInternalFuture<Long>>() {
                     @Override public void apply(IgniteInternalFuture<Long> future) {
-                        SyncFuture.this.onCancel();
+                        SyncFuture.this.cancel();
                     }
                 });
         }
@@ -828,7 +824,7 @@ public class GridDhtPartitionDemander {
         /**
          * @return Initialised or not.
          */
-        boolean isInited() {
+        private boolean isInited() {
             return assigns != null;
         }
 
@@ -836,24 +832,21 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          * @param parts Parts.
          */
-        void append(UUID nodeId, Collection<Integer> parts) {
-            remaining.put(nodeId, parts);
-
-            missed.put(nodeId, new GridConcurrentHashSet<Integer>());
-        }
+        private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
+            lock.lock();
 
-        /**
-         * @param nodeId Node id.
-         * @param time Time.
-         */
-        void logStart(UUID nodeId, long time) {
-            started.put(nodeId, time);
+            try {
+                remaining.put(nodeId, new IgniteBiTuple<>(System.currentTimeMillis(), parts));
+            }
+            finally {
+                lock.unlock();
+            }
         }
 
         /**
          * @param node Node.
          */
-        GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
+        private GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
             if (isDone())
                 return null;
 
@@ -861,47 +854,51 @@ public class GridDhtPartitionDemander {
         }
 
         /**
+         * Cancels this future.
          *
+         * @return {@code true}.
          */
-        void onCancel() {
+        @Override public boolean cancel() {
             lock.lock();
+
             try {
                 if (isDone())
-                    return;
+                    return true;
 
                 remaining.clear();
 
                 completed = false;
 
-                U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing from all nodes [cache=" + cctx.name()
-                    + ", topology=" + topologyVersion() +
-                    ", time=" +
-                    (started.isEmpty() ? 0 : (U.currentTimeMillis() - Collections.min(started.values()))) + " ms]");
+                U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
+                    + ", topology=" + topologyVersion());
 
                 checkIsDone();
             }
             finally {
                 lock.unlock();
             }
+
+            return true;
         }
 
         /**
          * @param nodeId Node id.
          */
-        void onCancel(UUID nodeId) {
+        private void cancel(UUID nodeId) {
             lock.lock();
+
             try {
                 if (isDone())
                     return;
 
+                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+                    ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                    ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+
                 remaining.remove(nodeId);
 
                 completed = false;
 
-                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
-                    ", from node=" + nodeId + ", topology=" + topologyVersion() +
-                    ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
-
                 checkIsDone();
             }
             finally {
@@ -911,18 +908,12 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         * @return Is completed.
-         */
-        boolean isCompleted() {
-            return completed;
-        }
-
-        /**
          * @param nodeId Node id.
          * @param p P.
          */
-        void onMissedPartition(UUID nodeId, int p) {
+        private void partitionMissed(UUID nodeId, int p) {
             lock.lock();
+
             try {
                 if (isDone())
                     return;
@@ -941,8 +932,9 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          * @param p P.
          */
-        void onPartitionDone(UUID nodeId, int p) {
+        private void partitionDone(UUID nodeId, int p) {
             lock.lock();
+
             try {
                 if (isDone())
                     return;
@@ -951,17 +943,17 @@ public class GridDhtPartitionDemander {
                     preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
                         assigns.exchangeFuture().discoveryEvent());
 
-                Collection<Integer> parts = remaining.get(nodeId);
+                Collection<Integer> parts = remaining.get(nodeId).get2();
 
                 if (parts != null) {
                     parts.remove(p);
 
                     if (parts.isEmpty()) {
-                        remaining.remove(nodeId);
-
                         U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
                             ", from node=" + nodeId + ", topology=" + topologyVersion() +
-                            ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+                            ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+
+                        remaining.remove(nodeId);
                     }
                 }
 
@@ -994,7 +986,7 @@ public class GridDhtPartitionDemander {
         /**
          *
          */
-        public void checkIsDone() {
+        private void checkIsDone() {
             if (remaining.isEmpty()) {
                 if (log.isDebugEnabled())
                     log.debug("Completed sync future.");
@@ -1301,7 +1293,7 @@ public class GridDhtPartitionDemander {
                                         // then we take ownership.
                                         if (last) {
                                             remaining.remove(p);
-                                            fut.onPartitionDone(node.id(), p);
+                                            fut.partitionDone(node.id(), p);
 
                                             top.own(part);
 
@@ -1320,7 +1312,7 @@ public class GridDhtPartitionDemander {
                                 }
                                 else {
                                     remaining.remove(p);
-                                    fut.onPartitionDone(node.id(), p);
+                                    fut.partitionDone(node.id(), p);
 
                                     if (log.isDebugEnabled())
                                         log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
@@ -1328,7 +1320,7 @@ public class GridDhtPartitionDemander {
                             }
                             else {
                                 remaining.remove(p);
-                                fut.onPartitionDone(node.id(), p);
+                                fut.partitionDone(node.id(), p);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
@@ -1342,7 +1334,7 @@ public class GridDhtPartitionDemander {
                             if (cctx.affinity().localNode(miss, topVer))
                                 missed.add(miss);
 
-                            fut.onMissedPartition(node.id(), miss);
+                            fut.partitionMissed(node.id(), miss);
                         }
 
                         if (remaining.isEmpty())
@@ -1379,7 +1371,7 @@ public class GridDhtPartitionDemander {
                 Collection<Integer> missed = new HashSet<>();
 
                 if (topologyChanged(topVer)) {
-                    fut.onCancel();
+                    fut.cancel();
 
                     return;
                 }
@@ -1400,16 +1392,16 @@ public class GridDhtPartitionDemander {
                         log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
                             ", msg=" + e.getMessage() + ']');
 
-                    fut.onCancel();
+                    fut.cancel();
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to receive partitions from node (rebalancing will not " +
                         "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
 
-                    fut.onCancel(node.id());
+                    fut.cancel(node.id());
                 }
                 catch (InterruptedException e) {
-                    fut.onCancel();
+                    fut.cancel();
                 }
             }
             finally {


[2/4] ignite git commit: 1093

Posted by sb...@apache.org.
1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ce1dd0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ce1dd0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ce1dd0a

Branch: refs/heads/ignite-1093-2
Commit: 5ce1dd0aca5d485f91dcd6f47798b46424436cce
Parents: e52367b
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 16:49:12 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 16:49:12 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 192 ++++++++++---------
 1 file changed, 102 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5ce1dd0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 31e2e5e..9960435 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -58,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -124,7 +123,7 @@ public class GridDhtPartitionDemander {
 
         boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
 
-        syncFut = new SyncFuture(null, cctx, log, true);
+        syncFut = new SyncFuture();//Dummy.
 
         if (!enabled)
             // Calling onDone() immediately since preloading is disabled.
@@ -132,13 +131,13 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     *
+     * Start.
      */
     void start() {
     }
 
     /**
-     *
+     * Stop.
      */
     void stop() {
         syncFut.cancel();
@@ -209,7 +208,7 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param name Name.
+     * @param name Cache name.
      * @param fut Future.
      */
     private void waitForCacheRebalancing(String name, SyncFuture fut) {
@@ -223,8 +222,6 @@ public class GridDhtPartitionDemander {
                 wFut.get();
             else {
                 fut.cancel();
-
-                return;
             }
         }
         catch (IgniteInterruptedCheckedException ignored) {
@@ -232,8 +229,6 @@ public class GridDhtPartitionDemander {
                 log.debug("Failed to wait for " + name + " cache rebalancing future (grid is stopping): " +
                     "[cacheName=" + cctx.name() + ']');
                 fut.cancel();
-
-                return;
             }
         }
         catch (IgniteCheckedException e) {
@@ -246,7 +241,7 @@ public class GridDhtPartitionDemander {
     /**
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException Exception
      */
 
     void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
@@ -258,46 +253,37 @@ public class GridDhtPartitionDemander {
         if (delay == 0 || force) {
             assert assigns != null;
 
-            final AffinityTopologyVersion topVer = assigns.topologyVersion();
+            final SyncFuture oldFut = syncFut;
 
-            SyncFuture fut = syncFut;
-
-            if (fut.isInited()) {
-                if (!fut.isDone())
-                    fut.cancel();
+            final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy());
 
-                fut = new SyncFuture(assigns, cctx, log, false);
+            syncFut = fut;
 
-                syncFut = fut;
-            }
+            if (!oldFut.isDummy())
+                oldFut.cancel();
             else
-                fut.init(assigns);
-
-            if (assigns.isEmpty()) {
-                fut.checkIsDone();
-
-                if (fut.assigns.topologyVersion().topologyVersion() > 1)// Not a First node.
-                    U.log(log, "Rebalancing is not required [cache=" + cctx.name() +
-                        ", topology=" + fut.assigns.topologyVersion() + "]");
+                fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> future) {
+                        oldFut.onDone();
+                    }
+                });
 
+            if (fut.doneIfEmpty())// Done in case empty assigns.
                 return;
-            }
 
-            if (topologyChanged(topVer)) {
+            if (topologyChanged(fut.topologyVersion())) {
                 fut.cancel();
 
                 return;
             }
 
-            final SyncFuture curFut = fut;
-
             IgniteThread thread = new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() {
                 @Override public void run() {
                     if (!CU.isMarshallerCache(cctx.name())) {
-                        waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, curFut);
+                        waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, fut);
 
                         if (!CU.isUtilityCache(cctx.name())) {
-                            waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, curFut);
+                            waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, fut);
                         }
                     }
 
@@ -312,10 +298,10 @@ public class GridDhtPartitionDemander {
                                     log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
                                         ", rebalanceOrder=" + rebalanceOrder + ']');
 
-                                if (!topologyChanged(topVer))
+                                if (!topologyChanged(fut.topologyVersion()))
                                     oFut.get();
                                 else {
-                                    curFut.cancel();
+                                    fut.cancel();
 
                                     return;
                                 }
@@ -325,19 +311,19 @@ public class GridDhtPartitionDemander {
                             if (log.isDebugEnabled()) {
                                 log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
                                     "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-                                curFut.cancel();
+                                fut.cancel();
 
                                 return;
                             }
                         }
                         catch (IgniteCheckedException e) {
-                            curFut.cancel();
+                            fut.cancel();
 
                             throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
                         }
                     }
 
-                    requestPartitions(curFut);
+                    requestPartitions(fut);
                 }
             });
 
@@ -394,15 +380,12 @@ public class GridDhtPartitionDemander {
             final CacheConfiguration cfg = cctx.config();
 
             U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
-                ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+                ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+                ", topology=" + d.topologyVersion() + "]");
 
             //Check remote node rebalancing API version.
             if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
-                GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
-
-                remainings.addAll(d.partitions());
-
-                fut.appendPartitions(node.id(), remainings);
+                fut.appendPartitions(node.id(), d.partitions());
 
                 int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
@@ -627,24 +610,22 @@ public class GridDhtPartitionDemander {
             for (Integer miss : supply.missed())
                 fut.partitionDone(id, miss);
 
-            if (!fut.isDone()) {
-                GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
+            GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
 
-                if (d != null) {
-                    // Create copy.
-                    GridDhtPartitionDemandMessage nextD =
-                        new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+            if (d != null) {
+                // Create copy.
+                GridDhtPartitionDemandMessage nextD =
+                    new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
 
-                    nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+                nextD.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
-                    if (!topologyChanged(topVer)) {
-                        // Send demand message.
-                        cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
-                            nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
-                    }
-                    else
-                        fut.cancel();
+                if (!topologyChanged(topVer)) {
+                    // Send demand message.
+                    cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                        nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
                 }
+                else
+                    fut.cancel();
             }
         }
         catch (ClusterTopologyCheckedException e) {
@@ -759,12 +740,12 @@ public class GridDhtPartitionDemander {
     /**
      *
      */
-    public static class SyncFuture extends GridFutureAdapter<Boolean> {
+    public static class SyncFuture extends GridFutureAdapter<Object> {
         /** */
         private static final long serialVersionUID = 1L;
 
         /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
-        private final boolean sendStopEvnt;
+        private final boolean sendStoppedEvnt;
 
         /** */
         private final GridCacheContext<?, ?> cctx;
@@ -782,13 +763,13 @@ public class GridDhtPartitionDemander {
         private final Lock lock = new ReentrantLock();
 
         /** Assignments. */
-        private volatile GridDhtPreloaderAssignments assigns;
-
-        /** Completed. */
-        private volatile boolean completed = true;
+        private final GridDhtPreloaderAssignments assigns;
 
         /**
          * @param assigns Assigns.
+         * @param cctx Context.
+         * @param log Logger.
+         * @param sentStopEvnt Stop event flag.
          */
         SyncFuture(GridDhtPreloaderAssignments assigns,
             GridCacheContext<?, ?> cctx,
@@ -797,35 +778,39 @@ public class GridDhtPartitionDemander {
             this.assigns = assigns;
             this.cctx = cctx;
             this.log = log;
-            this.sendStopEvnt = sentStopEvnt;
+            this.sendStoppedEvnt = sentStopEvnt;
+
+            if (assigns != null)
+                cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
+                    new CI1<IgniteInternalFuture<Long>>() {
+                        @Override public void apply(IgniteInternalFuture<Long> future) {
+                            SyncFuture.this.cancel();
+                        }
+                    });
         }
 
         /**
-         * @return Topology version.
+         * Dummy future. Will be done by real one.
          */
-        public AffinityTopologyVersion topologyVersion() {
-            return assigns != null ? assigns.topologyVersion() : null;
+        public SyncFuture() {
+            this.assigns = null;
+            this.cctx = null;
+            this.log = null;
+            this.sendStoppedEvnt = false;
         }
 
         /**
-         * @param assigns Assigns.
+         * @return Topology version.
          */
-        private void init(GridDhtPreloaderAssignments assigns) {
-            this.assigns = assigns;
-
-            cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen(
-                new CI1<IgniteInternalFuture<Long>>() {
-                    @Override public void apply(IgniteInternalFuture<Long> future) {
-                        SyncFuture.this.cancel();
-                    }
-                });
+        public AffinityTopologyVersion topologyVersion() {
+            return assigns != null ? assigns.topologyVersion() : null;
         }
 
         /**
-         * @return Initialised or not.
+         * @return Is dummy (created at demander creation).
          */
-        private boolean isInited() {
-            return assigns != null;
+        private boolean isDummy() {
+            return assigns == null;
         }
 
         /**
@@ -845,6 +830,7 @@ public class GridDhtPartitionDemander {
 
         /**
          * @param node Node.
+         * @return Demand message.
          */
         private GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
             if (isDone())
@@ -854,6 +840,36 @@ public class GridDhtPartitionDemander {
         }
 
         /**
+         * @return future is done.
+         */
+        private boolean doneIfEmpty() {
+            lock.lock();
+
+            try {
+                if (isDone())
+                    return true;
+
+                if (assigns.isEmpty()) {
+                    assert remaining.isEmpty();
+
+                    if (assigns.topologyVersion().topologyVersion() > 1)// Not an initial topology.
+                        U.log(log, "Rebalancing is not required [cache=" + cctx.name() +
+                            ", topology=" + assigns.topologyVersion() + "]");
+
+                    checkIsDone();
+
+                    return true;
+                }
+                else {
+                    return false;
+                }
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
          * Cancels this future.
          *
          * @return {@code true}.
@@ -867,8 +883,6 @@ public class GridDhtPartitionDemander {
 
                 remaining.clear();
 
-                completed = false;
-
                 U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
                     + ", topology=" + topologyVersion());
 
@@ -892,13 +906,11 @@ public class GridDhtPartitionDemander {
                     return;
 
                 U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
-                    ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                    ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
                     ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
 
                 remaining.remove(nodeId);
 
-                completed = false;
-
                 checkIsDone();
             }
             finally {
@@ -919,7 +931,7 @@ public class GridDhtPartitionDemander {
                     return;
 
                 if (missed.get(nodeId) == null)
-                    missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+                    missed.put(nodeId, new HashSet<Integer>());
 
                 missed.get(nodeId).add(p);
             }
@@ -950,7 +962,7 @@ public class GridDhtPartitionDemander {
 
                     if (parts.isEmpty()) {
                         U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
-                            ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                            ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
                             ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
 
                         remaining.remove(nodeId);
@@ -1007,10 +1019,10 @@ public class GridDhtPartitionDemander {
 
                 cctx.shared().exchange().scheduleResendPartitions();
 
-                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStopEvnt))
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
                     preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
 
-                onDone(completed);
+                onDone();
             }
         }
     }


[3/4] ignite git commit: 1093

Posted by sb...@apache.org.
1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c45d2af4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c45d2af4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c45d2af4

Branch: refs/heads/ignite-1093-2
Commit: c45d2af46e2a80f1c82724807a834f1afecc2be7
Parents: 5ce1dd0
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 18:24:02 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 18:24:02 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java |  2 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |  2 +-
 .../GridCacheRebalancingSyncSelfTest.java       | 67 ++++++++++++++++----
 3 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c45d2af4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 9960435..87a1a6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -193,7 +193,7 @@ public class GridDhtPartitionDemander {
      * @return {@code True} if topology changed.
      */
     private boolean topologyChanged(AffinityTopologyVersion topVer) {
-        return cctx.affinity().affinityTopologyVersion().topologyVersion() != topVer.topologyVersion();
+        return !cctx.affinity().affinityTopologyVersion().equals(topVer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c45d2af4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 49e89ca..0686376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -104,7 +104,7 @@ class GridDhtPartitionSupplier {
         assert d != null;
         assert id != null;
 
-        if (cctx.affinity().affinityTopologyVersion().topologyVersion() != d.topologyVersion().topologyVersion())
+        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
             return;
 
         GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/c45d2af4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 07c3e7c..db0c8ba 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -117,6 +118,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         return iCfg;
     }
 
+    /**
+     * @param ignite Ignite.
+     */
     protected void generateData(Ignite ignite) {
         generateData(ignite, CACHE_NAME_DHT_PARTITIONED);
         generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
@@ -140,6 +144,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     * @param ignite Ignite.
+     * @throws IgniteCheckedException Exception.
+     */
     protected void checkData(Ignite ignite) throws IgniteCheckedException {
         checkData(ignite, CACHE_NAME_DHT_PARTITIONED);
         checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
@@ -149,7 +157,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
     /**
      * @param ignite Ignite.
-     * @throws IgniteCheckedException
+     * @param name Cache name.
+     * @throws IgniteCheckedException Exception.
      */
     protected void checkData(Ignite ignite, String name) throws IgniteCheckedException {
         for (int i = 0; i < TEST_SIZE; i++) {
@@ -162,7 +171,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception Exception
      */
     public void testSimpleRebalancing() throws Exception {
         Ignite ignite = startGrid(0);
@@ -189,10 +198,30 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param id Id.
-     * @param top Topology.
+     * @param id Node id.
+     * @param major Major ver.
+     * @param minor Minor ver.
+     * @throws IgniteCheckedException Exception.
+     */
+    protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException {
+        waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
+    }
+
+    /**
+     * @param id Node id.
+     * @param major Major ver.
+     * @throws IgniteCheckedException Exception.
      */
-    protected void waitForRebalancing(int id, int top) throws IgniteCheckedException {
+    protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
+        waitForRebalancing(id, new AffinityTopologyVersion(major));
+    }
+
+    /**
+     * @param id Node id.
+     * @param top Topology version.
+     * @throws IgniteCheckedException
+     */
+    protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException {
         boolean finished = false;
 
         while (!finished) {
@@ -200,7 +229,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
             for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
                 GridDhtPartitionDemander.SyncFuture fut = (GridDhtPartitionDemander.SyncFuture)c.preloader().syncFuture();
-                if (fut.topologyVersion().topologyVersion() != top) {
+                if (fut.topologyVersion() == null || !fut.topologyVersion().equals(top)) {
                     finished = false;
 
                     break;
@@ -229,6 +258,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
                     startGrid(1);
                     startGrid(2);
 
+                    while (!concurrentStartFinished2) {
+                        U.sleep(10);
+                    }
+
+                    //New cache should start rebalancing.
+                    CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+                    cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW");
+                    cacheRCfg.setCacheMode(CacheMode.PARTITIONED);
+                    cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+                    grid(0).getOrCreateCache(cacheRCfg);
+
                     concurrentStartFinished = true;
                 }
                 catch (Exception e) {
@@ -256,10 +298,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         }
 
         //wait until cache rebalanced in async mode
-        waitForRebalancing(1, 5);
-        waitForRebalancing(2, 5);
-        waitForRebalancing(3, 5);
-        waitForRebalancing(4, 5);
+
+        waitForRebalancing(1, 5, 1);
+        waitForRebalancing(2, 5, 1);
+        waitForRebalancing(3, 5, 1);
+        waitForRebalancing(4, 5, 1);
 
         //cache rebalanced in async node
 
@@ -302,7 +345,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception Exception.
      */
     public void testBackwardCompatibility() throws Exception {
         Ignite ignite = startGrid(0);
@@ -327,7 +370,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception Exception.
      */
     public void testNodeFailedAtRebalancing() throws Exception {
         Ignite ignite = startGrid(0);


[4/4] ignite git commit: 1093

Posted by sb...@apache.org.
1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fdfa62f0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fdfa62f0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fdfa62f0

Branch: refs/heads/ignite-1093-2
Commit: fdfa62f0ff67c3d8266f862e0a0b53e065a96f91
Parents: c45d2af
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 18:40:07 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 18:40:07 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/preloader/GridDhtPartitionDemander.java  | 8 +-------
 1 file changed, 1 insertion(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fdfa62f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 87a1a6b..aa7d90b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -497,15 +497,9 @@ public class GridDhtPartitionDemander {
 
         final SyncFuture fut = syncFut;
 
-        if (!fut.topologyVersion().equals(topVer))
+        if (!fut.topologyVersion().equals(topVer))//will check topology changed at loop.
             return;
 
-        if (topologyChanged(topVer)) {
-            fut.cancel();
-
-            return;
-        }
-
         ClusterNode node = cctx.node(id);
 
         assert node != null;