You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/14 18:38:24 UTC

[30/34] incubator-ignite git commit: ignite-1093

ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8f36482b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8f36482b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8f36482b

Branch: refs/heads/ignite-1093
Commit: 8f36482b33d05d20a065d2b3684d82ab7559b902
Parents: 50e188d
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Wed Aug 12 12:42:48 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Wed Aug 12 12:42:48 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 168 ++++++++-----------
 .../dht/preloader/GridDhtPreloader.java         |  19 +--
 .../GridCacheMassiveRebalancingSelfTest.java    |  19 ++-
 3 files changed, 90 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f36482b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index f6a33c3..6c95707 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
@@ -305,6 +306,22 @@ public class GridDhtPartitionDemander {
 
                 ClusterNode node = e.getKey();
 
+                final long start = U.currentTimeMillis();
+
+                final CacheConfiguration cfg = cctx.config();
+
+                if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
+                    U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                        ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + "]");
+
+                    syncFut.listen(new CI1<Object>() {
+                        @Override public void apply(Object t) {
+                            U.log(log, "Completed rebalancing [cache=" + cctx.name() + ", mode="
+                                + cfg.getRebalanceMode() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
+                        }
+                    });
+                }
+
                 GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
 
                 remainings.addAll(d.partitions());
@@ -342,50 +359,6 @@ public class GridDhtPartitionDemander {
                         }
                     }
                 }
-
-                if (log.isInfoEnabled() && !d.partitions().isEmpty()) {
-                    LinkedList<Integer> s = new LinkedList<>(d.partitions());
-
-                    Collections.sort(s);
-
-                    StringBuilder sb = new StringBuilder();
-
-                    int start = -1;
-
-                    int prev = -1;
-
-                    Iterator<Integer> sit = s.iterator();
-
-                    while (sit.hasNext()) {
-                        int p = sit.next();
-                        if (start == -1) {
-                            start = p;
-                            prev = p;
-                        }
-
-                        if (prev < p - 1) {
-                            sb.append(start);
-
-                            if (start != prev)
-                                sb.append("-").append(prev);
-
-                            sb.append(", ");
-
-                            start = p;
-                        }
-
-                        if (!sit.hasNext()) {
-                            sb.append(start);
-
-                            if (start != p)
-                                sb.append("-").append(p);
-                        }
-
-                        prev = p;
-                    }
-
-                    log.info("Requested rebalancing [from node=" + node.id() + ", partitions=" + s.size() + " (" + sb.toString() + ")]");
-                }
             }
         }
         else if (delay > 0) {
@@ -659,82 +632,83 @@ public class GridDhtPartitionDemander {
     void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
         lastExchangeFut = lastFut;
     }
-/**
- *
- */
-private class SyncFuture extends GridFutureAdapter<Object> {
-    /** */
-    private static final long serialVersionUID = 1L;
-
-    private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
 
-    private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
-
-    public void append(UUID nodeId, Collection<Integer> parts) {
-        remaining.put(nodeId, parts);
-
-        missed.put(nodeId, new GridConcurrentHashSet<Integer>());
-    }
+    /**
+     *
+     */
+    private class SyncFuture extends GridFutureAdapter<Object> {
+        /** */
+        private static final long serialVersionUID = 1L;
 
-    void cancel(UUID nodeId) {
-        if (isDone())
-            return;
+        private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
 
-        remaining.remove(nodeId);
+        private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
 
-        checkIsDone();
-    }
+        public void append(UUID nodeId, Collection<Integer> parts) {
+            remaining.put(nodeId, parts);
 
-    void onMissedPartition(UUID nodeId, int p) {
-        if (missed.get(nodeId) == null)
             missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+        }
 
-        missed.get(nodeId).add(p);
-   }
-
-    void onPartitionDone(UUID nodeId, int p) {
-        if (isDone())
-            return;
+        void cancel(UUID nodeId) {
+            if (isDone())
+                return;
 
-        Collection<Integer> parts = remaining.get(nodeId);
+            remaining.remove(nodeId);
 
-        parts.remove(p);
+            checkIsDone();
+        }
 
-        if (parts.isEmpty()) {
-            remaining.remove(nodeId);
+        void onMissedPartition(UUID nodeId, int p) {
+            if (missed.get(nodeId) == null)
+                missed.put(nodeId, new GridConcurrentHashSet<Integer>());
 
-            if (log.isDebugEnabled())
-                log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']');
+            missed.get(nodeId).add(p);
         }
 
-        checkIsDone();
-    }
+        void onPartitionDone(UUID nodeId, int p) {
+            if (isDone())
+                return;
 
-    private void checkIsDone() {
-        if (remaining.isEmpty()) {
-            if (log.isDebugEnabled())
-                log.debug("Completed sync future.");
+            Collection<Integer> parts = remaining.get(nodeId);
+
+            parts.remove(p);
 
-            Collection<Integer> m = new HashSet<>();
+            if (parts.isEmpty()) {
+                remaining.remove(nodeId);
 
-            for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
-                if (e.getValue() != null && !e.getValue().isEmpty())
-                    m.addAll(e.getValue());
+                if (log.isDebugEnabled())
+                    log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']');
             }
 
-            if (!m.isEmpty()) {
+            checkIsDone();
+        }
+
+        private void checkIsDone() {
+            if (remaining.isEmpty()) {
                 if (log.isDebugEnabled())
-                    log.debug("Reassigning partitions that were missed: " + m);
+                    log.debug("Completed sync future.");
 
-                cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
-            }
+                Collection<Integer> m = new HashSet<>();
 
-            missed.clear();
+                for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+                    if (e.getValue() != null && !e.getValue().isEmpty())
+                        m.addAll(e.getValue());
+                }
+
+                if (!m.isEmpty()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Reassigning partitions that were missed: " + m);
+
+                    cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
+                }
+
+                missed.clear();
 
-            cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary?
+                cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary?
 
-            onDone();
+                onDone();
+            }
         }
     }
 }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f36482b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index d994a19..7f99ebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
@@ -220,24 +219,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void onInitialExchangeComplete(@Nullable Throwable err) {
-        if (err == null) {
+        if (err == null)
             startFut.onDone();
-
-            final long start = U.currentTimeMillis();
-
-            final CacheConfiguration cfg = cctx.config();
-
-            if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
-                U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
-
-                demander.syncFuture().listen(new CI1<Object>() {
-                    @Override public void apply(Object t) {
-                        U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
-                            "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
-                    }
-                });
-            }
-        }
         else
             startFut.onDone(err);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f36482b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
index 5148753..ca95905 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -113,12 +115,27 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
 
         long spend = (System.currentTimeMillis() - start) / 1000;
 
+        IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+        IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
         stopGrid(0);
 
-        Thread.sleep(20000);
+        while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() ||
+            f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+            U.sleep(100);
+
+        ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+        ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+
+        f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
 
         stopGrid(1);
 
+        while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+            U.sleep(100);
+
+        ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+
         checkData(grid(2));
 
         log.info("Spend " + spend + " seconds to preload entries.");