You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/25 13:24:47 UTC

ignite git commit: ignite-1093

Repository: ignite
Updated Branches:
  refs/heads/ignite-1093 96e9a97eb -> 933fd6fdd


ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/933fd6fd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/933fd6fd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/933fd6fd

Branch: refs/heads/ignite-1093
Commit: 933fd6fdd2ea0807dceaf158fd2fe81c89245923
Parents: 96e9a97
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Tue Aug 25 14:24:37 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Tue Aug 25 14:24:37 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 263 +++++++++++--------
 ...GridCacheMassiveRebalancingSyncSelfTest.java |   2 +-
 2 files changed, 154 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/933fd6fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 9d244ef..a5f8226 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
 import org.jsr166.*;
 
 import java.util.*;
@@ -91,50 +92,6 @@ public class GridDhtPartitionDemander {
      *
      */
     void start() {
-        int rebalanceOrder = cctx.config().getRebalanceOrder();
-
-        if (!CU.isMarshallerCache(cctx.name())) {
-            if (log.isDebugEnabled())
-                log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
-
-            try {
-                cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
-            }
-            catch (IgniteInterruptedCheckedException ignored) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
-                        "[cacheName=" + cctx.name() + ']');
-
-                return;
-            }
-            catch (IgniteCheckedException e) {
-                throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
-            }
-        }
-
-        if (rebalanceOrder > 0) {
-            IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
-
-            try {
-                if (fut != null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
-                            ", rebalanceOrder=" + rebalanceOrder + ']');
-
-                    fut.get();
-                }
-            }
-            catch (IgniteInterruptedCheckedException ignored) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
-                        "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-
-                return;
-            }
-            catch (IgniteCheckedException e) {
-                throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
-            }
-        }
     }
 
     /**
@@ -227,92 +184,94 @@ public class GridDhtPartitionDemander {
         if (delay == 0 || force) {
             assert assigns != null;
 
-            AffinityTopologyVersion topVer = assigns.topologyVersion();
+            final AffinityTopologyVersion topVer = assigns.topologyVersion();
 
-            if (syncFut.isInited()) {
-                if (!syncFut.isDone())
-                    syncFut.onCancel();
+            SyncFuture curSyncFut = syncFut;
 
-                syncFut = new SyncFuture(assigns);
+            if (curSyncFut.isInited()) {
+                if (!curSyncFut.isDone())
+                    curSyncFut.onCancel();
+
+                curSyncFut = new SyncFuture(assigns);
+
+                syncFut = curSyncFut;
             }
             else
-                syncFut.init(assigns);
+                curSyncFut.init(assigns);
 
             if (assigns.isEmpty() || topologyChanged(topVer)) {
-                syncFut.onCancel();
+                curSyncFut.onCancel();
 
                 return;
             }
 
-            for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
-                GridDhtPartitionDemandMessage d = e.getValue();
-
-                d.timeout(cctx.config().getRebalanceTimeout());
-                d.workerId(0);//old api support.
+            final SyncFuture cSF = curSyncFut;
 
-                final ClusterNode node = e.getKey();
-
-                final long start = U.currentTimeMillis();
-
-                final CacheConfiguration cfg = cctx.config();
+            new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() {
+                @Override public void run() {
+                    if (!CU.isMarshallerCache(cctx.name())) {
+                        if (log.isDebugEnabled())
+                            log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
 
-                final AffinityTopologyVersion top = d.topologyVersion();
+                        try {
+                            IgniteInternalFuture fut = cctx.kernalContext().cache().marshallerCache().preloader().syncFuture();
 
-                if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
-                    U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
-                        ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+                            if (!topologyChanged(topVer))
+                                fut.get();
+                            else {
+                                cSF.onCancel();
 
-                    syncFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                        @Override public void apply(IgniteInternalFuture<Boolean> t) {
-                            Boolean cancelled = ((SyncFuture)t).cancelled();
-                            U.log(log, (cancelled ? "Cancelled" : "Completed") + " rebalancing [cache=" + cctx.name() + ", mode="
-                                + cfg.getRebalanceMode() + ", from node=" + node.id() + ", topology=" + top +
-                                ", time=" + (U.currentTimeMillis() - start) + " ms]");
+                                return;
+                            }
                         }
-                    });
-                }
-
-                GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
-
-                remainings.addAll(d.partitions());
+                        catch (IgniteInterruptedCheckedException ignored) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
+                                    "[cacheName=" + cctx.name() + ']');
 
-                syncFut.append(node.id(), remainings);
-
-                int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
-
-                List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
-
-                for (int cnt = 0; cnt < lsnrCnt; cnt++)
-                    sParts.add(new HashSet<Integer>());
-
-                Iterator<Integer> it = d.partitions().iterator();
-
-                int cnt = 0;
+                            return;
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
+                        }
+                    }
 
-                while (it.hasNext())
-                    sParts.get(cnt++ % lsnrCnt).add(it.next());
+                    int rebalanceOrder = cctx.config().getRebalanceOrder();
 
-                for (cnt = 0; cnt < lsnrCnt; cnt++) {
+                    if (rebalanceOrder > 0) {
+                        IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
 
-                    if (!sParts.get(cnt).isEmpty()) {
+                        try {
+                            if (fut != null) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
+                                        ", rebalanceOrder=" + rebalanceOrder + ']');
 
-                        // Create copy.
-                        GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+                                if (!topologyChanged(topVer))
+                                    fut.get();
+                                else {
+                                    cSF.onCancel();
 
-                        initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
+                                    return;
+                                }
+                            }
+                        }
+                        catch (IgniteInterruptedCheckedException ignored) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
+                                    "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
 
-                        try {
-                            cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+                            return;
                         }
-                        catch (IgniteCheckedException ex) {
-                            U.error(log, "Failed to send partition demand message to local node", ex);
+                        catch (IgniteCheckedException e) {
+                            throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
                         }
-
-                        if (log.isDebugEnabled())
-                            log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
                     }
+
+                    requestPartitions(assigns, topVer, cSF);
                 }
-            }
+            }).start();
+
         }
         else if (delay > 0) {
             GridTimeoutObject obj = lastTimeoutObj.get();
@@ -341,6 +300,90 @@ public class GridDhtPartitionDemander {
     }
 
     /**
+     * @param assigns Assigns.
+     */
+    private void requestPartitions(
+        final GridDhtPreloaderAssignments assigns,
+        AffinityTopologyVersion topVer,
+        SyncFuture fut) {
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+            if (topologyChanged(topVer)) {
+                fut.onCancel();
+
+                return;
+            }
+
+            GridDhtPartitionDemandMessage d = e.getValue();
+
+            d.timeout(cctx.config().getRebalanceTimeout());
+            d.workerId(0);//old api support.
+
+            final ClusterNode node = e.getKey();
+
+            final long start = U.currentTimeMillis();
+
+            final CacheConfiguration cfg = cctx.config();
+
+            final AffinityTopologyVersion top = d.topologyVersion();
+
+            if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
+                U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                    ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+
+                syncFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> t) {
+                        Boolean completed = ((SyncFuture)t).isCompleted();
+                        U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing [cache=" + cctx.name() + ", mode="
+                            + cfg.getRebalanceMode() + ", from node=" + node.id() + ", topology=" + top +
+                            ", time=" + (U.currentTimeMillis() - start) + " ms]");
+                    }
+                });
+            }
+
+            GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
+
+            remainings.addAll(d.partitions());
+
+            syncFut.append(node.id(), remainings);
+
+            int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
+
+            List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+
+            for (int cnt = 0; cnt < lsnrCnt; cnt++)
+                sParts.add(new HashSet<Integer>());
+
+            Iterator<Integer> it = d.partitions().iterator();
+
+            int cnt = 0;
+
+            while (it.hasNext())
+                sParts.get(cnt++ % lsnrCnt).add(it.next());
+
+            for (cnt = 0; cnt < lsnrCnt; cnt++) {
+
+                if (!sParts.get(cnt).isEmpty()) {
+
+                    // Create copy.
+                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+                    initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
+
+                    try {
+                        cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+                    }
+                    catch (IgniteCheckedException ex) {
+                        U.error(log, "Failed to send partition demand message to local node", ex);
+                    }
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
+                }
+            }
+        }
+    }
+
+    /**
      * @param c Partitions.
      * @return String representation of partitions list.
      */
@@ -647,7 +690,7 @@ public class GridDhtPartitionDemander {
         /** Assignments. */
         private volatile GridDhtPreloaderAssignments assigns;
 
-        private volatile boolean cancelled = false;
+        private volatile boolean completed = true;
 
         SyncFuture(GridDhtPreloaderAssignments assigns) {
             this.assigns = assigns;
@@ -691,7 +734,7 @@ public class GridDhtPartitionDemander {
         void onCancel() {
             remaining.clear();
 
-            cancelled = true;
+            completed = false;
 
             checkIsDone();
         }
@@ -702,13 +745,13 @@ public class GridDhtPartitionDemander {
 
             remaining.remove(nodeId);
 
-            cancelled = true;
+            completed = false;
 
             checkIsDone();
         }
 
-        boolean cancelled() {
-            return cancelled;
+        boolean isCompleted() {
+            return completed;
         }
 
         void onMissedPartition(UUID nodeId, int p, AffinityTopologyVersion topVer) {
@@ -774,7 +817,7 @@ public class GridDhtPartitionDemander {
                 if (lsnr != null)
                     cctx.events().removeListener(lsnr);
 
-                onDone(cancelled);
+                onDone(completed);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/933fd6fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index b0fc011..f69b710 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -39,7 +39,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
     /** */
     protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-    private static int TEST_SIZE = 5_000_000;
+    private static int TEST_SIZE = 1_000_000;
 
     /** cache name. */
     protected static String CACHE_NAME_DHT = "cache";