You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/21 15:36:18 UTC

[1/2] incubator-ignite git commit: ignite-1093

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1093 a483f5220 -> dc10b85cc


ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a5bd80de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a5bd80de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a5bd80de

Branch: refs/heads/ignite-1093
Commit: a5bd80ded53be77ff568f5c9edbbeddea91d1aff
Parents: a483f52
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Fri Aug 21 15:08:39 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Fri Aug 21 15:08:39 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java |   5 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |   7 +-
 ...GridCacheMassiveRebalancingSyncSelfTest.java | 202 ++++++++++++++-----
 3 files changed, 157 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index e11addc..0c30630 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -291,7 +291,8 @@ public class GridDhtPartitionDemander {
             AffinityTopologyVersion topVer = assigns.topologyVersion();
 
             if (syncFut.isInited()) {
-                syncFut.get();
+                if (!syncFut.isDone())
+                    syncFut.onCancel();
 
                 syncFut = new SyncFuture(assigns);
             }
@@ -791,7 +792,7 @@ public class GridDhtPartitionDemander {
 
             Collection<Integer> parts = remaining.get(nodeId);
 
-            if (parts!=null) {
+            if (parts != null) {
                 parts.remove(p);
 
                 if (parts.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index c496f8d..546e67b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -81,15 +81,13 @@ class GridDhtPartitionSupplier {
 
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
-                final int idx = cnt;
-
                 cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
                     @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
                         if (!enterBusy())
                             return;
 
                         try {
-                            processMessage(m, id, idx);
+                            processMessage(m, id);
                         }
                         finally {
                             leaveBusy();
@@ -161,9 +159,8 @@ class GridDhtPartitionSupplier {
     /**
      * @param d Demand message.
      * @param id Node uuid.
-     * @param idx Index.
      */
-    private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) {
+    private void processMessage(GridDhtPartitionDemandMessage d, UUID id) {
         assert d != null;
         assert id != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index cc82e79..d92ec86 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -39,11 +39,14 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
     /** */
     protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-    private static int TEST_SIZE = 1_024_000;
+    private static int TEST_SIZE = 1_120_000;
 
     /** cache name. */
     protected static String CACHE_NAME_DHT = "cache";
 
+    /** cache 2 name. */
+    protected static String CACHE_2_NAME_DHT = "cache2";
+
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return Long.MAX_VALUE;
@@ -53,24 +56,33 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration iCfg = super.getConfiguration(gridName);
 
-        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
-
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
 
         if (getTestGridName(10).equals(gridName))
             iCfg.setClientMode(true);
 
+        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
         cacheCfg.setName(CACHE_NAME_DHT);
         cacheCfg.setCacheMode(CacheMode.PARTITIONED);
         //cacheCfg.setRebalanceBatchSize(1024);
         //cacheCfg.setRebalanceBatchesCount(1);
         cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-        cacheCfg.setRebalanceThreadPoolSize(4);
-        //cacheCfg.setRebalanceTimeout(1000000);
+        cacheCfg.setRebalanceThreadPoolSize(8);
         cacheCfg.setBackups(1);
 
-        iCfg.setCacheConfiguration(cacheCfg);
+        CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>();
+
+        cacheCfg2.setName(CACHE_2_NAME_DHT);
+        cacheCfg2.setCacheMode(CacheMode.PARTITIONED);
+        //cacheCfg2.setRebalanceBatchSize(1024);
+        //cacheCfg2.setRebalanceBatchesCount(1);
+        cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg2.setRebalanceThreadPoolSize(8);
+        cacheCfg2.setBackups(1);
+
+        iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);
         return iCfg;
     }
 
@@ -86,6 +98,14 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
                 stmr.addData(i, i);
             }
         }
+        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) {
+            for (int i = 0; i < TEST_SIZE; i++) {
+                if (i % 1_000_000 == 0)
+                    log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+                stmr.addData(i, i + 3);
+            }
+        }
     }
 
     /**
@@ -97,7 +117,15 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
             if (i % 1_000_000 == 0)
                 log.info("Checked " + i / 1_000_000 + "m entries.");
 
-            assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
+            assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) :
+                "keys " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")";
+        }
+        for (int i = 0; i < TEST_SIZE; i++) {
+            if (i % 1_000_000 == 0)
+                log.info("Checked " + i / 1_000_000 + "m entries.");
+
+            assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) :
+                "keys " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")";
         }
     }
 
@@ -125,7 +153,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
 
         checkData(grid(1));
 
-        log.info("Spend " + spend + " seconds to preload entries.");
+        log.info("Spend " + spend + " seconds to rebalance entries.");
 
         stopAllGrids();
     }
@@ -148,79 +176,153 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
         startGrid(3);
         startGrid(4);
 
-        GridCachePreloader p1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-        GridCachePreloader p2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-        GridCachePreloader p3 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-        GridCachePreloader p4 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        //wait until cache rebalanced in async mode
 
-        IgniteInternalFuture f4 = p4.syncFuture();
-        f4.get();
+        GridCachePreloader p11 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        GridCachePreloader p12 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        GridCachePreloader p13 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        GridCachePreloader p14 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
 
-        AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f4).topologyVersion();
+        GridCachePreloader p21 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
+        GridCachePreloader p22 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
+        GridCachePreloader p23 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
+        GridCachePreloader p24 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
 
-        IgniteInternalFuture f1 = p1.syncFuture();
-        IgniteInternalFuture f2 = p2.syncFuture();
-        IgniteInternalFuture f3 = p3.syncFuture();
+        IgniteInternalFuture f24 = p24.syncFuture();
+        f24.get();
 
-        while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(f4Top) ||
-            !((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion().equals(f4Top) ||
-            !((GridDhtPartitionDemander.SyncFuture)f3).topologyVersion().equals(f4Top)) {
+        IgniteInternalFuture f14 = p14.syncFuture();
+        f14.get();
+
+        AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f24).topologyVersion();
+
+        IgniteInternalFuture f11 = p11.syncFuture();
+        IgniteInternalFuture f12 = p12.syncFuture();
+        IgniteInternalFuture f13 = p13.syncFuture();
+
+        while (!((GridDhtPartitionDemander.SyncFuture)f11).topologyVersion().equals(f4Top) ||
+            !((GridDhtPartitionDemander.SyncFuture)f12).topologyVersion().equals(f4Top) ||
+            !((GridDhtPartitionDemander.SyncFuture)f13).topologyVersion().equals(f4Top)) {
             U.sleep(100);
 
-            f1 = p1.syncFuture();
-            f2 = p2.syncFuture();
-            f3 = p3.syncFuture();
+            f11 = p11.syncFuture();
+            f12 = p12.syncFuture();
+            f13 = p13.syncFuture();
         }
-        f1.get();
-        f2.get();
-        f3.get();
+        f11.get();
+        f12.get();
+        f13.get();
 
-        long spend = (System.currentTimeMillis() - start) / 1000;
+        IgniteInternalFuture f21 = p21.syncFuture();
+        IgniteInternalFuture f22 = p22.syncFuture();
+        IgniteInternalFuture f23 = p23.syncFuture();
+
+        while (!((GridDhtPartitionDemander.SyncFuture)f21).topologyVersion().equals(f4Top) ||
+            !((GridDhtPartitionDemander.SyncFuture)f22).topologyVersion().equals(f4Top) ||
+            !((GridDhtPartitionDemander.SyncFuture)f23).topologyVersion().equals(f4Top)) {
+            U.sleep(100);
+
+            f21 = p21.syncFuture();
+            f22 = p22.syncFuture();
+            f23 = p23.syncFuture();
+        }
+        f21.get();
+        f22.get();
+        f23.get();
 
-        f1 = p1.syncFuture();
-        f2 = p2.syncFuture();
-        f3 = p3.syncFuture();
-        f4 = p4.syncFuture();
+        //cache rebalanced in async node
+
+        f11 = p11.syncFuture();
+        f12 = p12.syncFuture();
+        f13 = p13.syncFuture();
+        f14 = p14.syncFuture();
+
+        f21 = p21.syncFuture();
+        f22 = p22.syncFuture();
+        f23 = p23.syncFuture();
+        f24 = p24.syncFuture();
 
         stopGrid(0);
 
-        while (f1 == p1.syncFuture() || f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture())
+        //wait until cache rebalanced
+
+        while (f11 == p11.syncFuture() || f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture())
             U.sleep(100);
 
-        p1.syncFuture().get();
-        p2.syncFuture().get();
-        p3.syncFuture().get();
-        p4.syncFuture().get();
+        while (f21 == p21.syncFuture() || f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture())
+            U.sleep(100);
+
+        p11.syncFuture().get();
+        p12.syncFuture().get();
+        p13.syncFuture().get();
+        p14.syncFuture().get();
+
+        p21.syncFuture().get();
+        p22.syncFuture().get();
+        p23.syncFuture().get();
+        p24.syncFuture().get();
 
-        f2 = p2.syncFuture();
-        f3 = p3.syncFuture();
-        f4 = p4.syncFuture();
+        //cache rebalanced
+
+        f12 = p12.syncFuture();
+        f13 = p13.syncFuture();
+        f14 = p14.syncFuture();
+
+        f22 = p22.syncFuture();
+        f23 = p23.syncFuture();
+        f24 = p24.syncFuture();
 
         stopGrid(1);
 
-        while (f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture())
+        //wait until cache rebalanced
+
+        while (f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture())
             U.sleep(100);
 
-        p2.syncFuture().get();
-        p3.syncFuture().get();
-        p4.syncFuture().get();
+        while (f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture())
+            U.sleep(100);
+
+        p12.syncFuture().get();
+        p13.syncFuture().get();
+        p14.syncFuture().get();
+
+        p22.syncFuture().get();
+        p23.syncFuture().get();
+        p24.syncFuture().get();
 
-        f3 = p3.syncFuture();
-        f4 = p4.syncFuture();
+        //cache rebalanced
+
+        f13 = p13.syncFuture();
+        f14 = p14.syncFuture();
+
+        f23 = p23.syncFuture();
+        f24 = p24.syncFuture();
 
         stopGrid(2);
 
-        while (f3 == p3.syncFuture() || f4 == p4.syncFuture())
+        //wait until cache rebalanced
+
+        while (f13 == p13.syncFuture() || f14 == p14.syncFuture())
             U.sleep(100);
 
-        p3.syncFuture().get();
-        p4.syncFuture().get();
+        while (f23 == p23.syncFuture() || f24 == p24.syncFuture())
+            U.sleep(100);
+
+        p13.syncFuture().get();
+        p14.syncFuture().get();
+
+        p23.syncFuture().get();
+        p24.syncFuture().get();
+
+        //cache rebalanced
 
         stopGrid(3);
 
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
         checkData(grid(4));
 
-        log.info("Spend " + spend + " seconds to preload entries.");
+        log.info("Spend " + spend + " seconds to rebalance entries.");
 
         stopAllGrids();
     }


[2/2] incubator-ignite git commit: ignite-1093

Posted by av...@apache.org.
ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dc10b85c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dc10b85c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dc10b85c

Branch: refs/heads/ignite-1093
Commit: dc10b85ccc83b1f2999685a10b1997e3e4f5d18b
Parents: a5bd80d
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Fri Aug 21 16:35:49 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Fri Aug 21 16:35:49 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/preloader/GridDhtPartitionDemander.java     | 5 ++++-
 .../rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java    | 4 ++--
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc10b85c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0c30630..126bde9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -339,7 +339,10 @@ public class GridDhtPartitionDemander {
 
                 syncFut.append(node.id(), remainings);
 
-                int lsnrCnt = cctx.config().getRebalanceThreadPoolSize();
+                int lsnrCnt = cctx.config().getRebalanceThreadPoolSize() / assigns.entrySet().size();
+
+                if (lsnrCnt == 0)
+                    lsnrCnt = 1;//At least one listener per node.
 
                 List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc10b85c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index d92ec86..1182254 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -69,7 +69,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
         //cacheCfg.setRebalanceBatchSize(1024);
         //cacheCfg.setRebalanceBatchesCount(1);
         cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-        cacheCfg.setRebalanceThreadPoolSize(8);
+        //cacheCfg.setRebalanceThreadPoolSize(8);
         cacheCfg.setBackups(1);
 
         CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>();
@@ -79,7 +79,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
         //cacheCfg2.setRebalanceBatchSize(1024);
         //cacheCfg2.setRebalanceBatchesCount(1);
         cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
-        cacheCfg2.setRebalanceThreadPoolSize(8);
+        //cacheCfg2.setRebalanceThreadPoolSize(8);
         cacheCfg2.setBackups(1);
 
         iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);