You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/17 13:31:24 UTC

incubator-ignite git commit: ignite-1093

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1093 64319443a -> 9da4b9367


ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9da4b936
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9da4b936
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9da4b936

Branch: refs/heads/ignite-1093
Commit: 9da4b936795813b1786a7c0c5ed2abe87f504e09
Parents: 6431944
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Mon Aug 17 12:38:11 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Mon Aug 17 12:38:11 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 13 +++-
 ...GridCacheMassiveRebalancingSyncSelfTest.java | 81 +++++++++++++++-----
 2 files changed, 72 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9da4b936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 262ccb7..6d024de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -255,6 +255,14 @@ public class GridDhtPartitionDemander {
     }
 
     /**
+     * @param type Type.
+     * @param discoEvt Discovery event.
+     */
+    private void preloadEvent(int type, DiscoveryEvent discoEvt) {
+        preloadEvent(-1, type, discoEvt);
+    }
+
+    /**
      * @param part Partition.
      * @param type Type.
      * @param discoEvt Discovery event.
@@ -796,7 +804,10 @@ public class GridDhtPartitionDemander {
 
                 missed.clear();
 
-                cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary?
+                cctx.shared().exchange().scheduleResendPartitions();
+
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && !cctx.isReplicated())
+                    preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
 
                 onDone(cancelled);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9da4b936/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index cd12954..91352ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -57,7 +58,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
 
-        if (getTestGridName(3).equals(gridName))
+        if (getTestGridName(10).equals(gridName))
             iCfg.setClientMode(true);
 
         cacheCfg.setName(CACHE_NAME_DHT);
@@ -141,45 +142,83 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
 
         long start = System.currentTimeMillis();
 
+        //will be started simultaneously in case of ASYNC mode
         startGrid(1);
         startGrid(2);
+        startGrid(3);
+        startGrid(4);
 
-        IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-        f2.get();
+        GridCachePreloader p1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        GridCachePreloader p2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        GridCachePreloader p3 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        GridCachePreloader p4 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
 
-        IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-        while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion())) {
+        IgniteInternalFuture f4 = p4.syncFuture();
+        f4.get();
+
+        AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f4).topologyVersion();
+
+        IgniteInternalFuture f1 = p1.syncFuture();
+        IgniteInternalFuture f2 = p2.syncFuture();
+        IgniteInternalFuture f3 = p3.syncFuture();
+
+        while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(f4Top) ||
+            !((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion().equals(f4Top) ||
+            !((GridDhtPartitionDemander.SyncFuture)f3).topologyVersion().equals(f4Top)) {
             U.sleep(100);
 
-            f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+            f1 = p1.syncFuture();
+            f2 = p2.syncFuture();
+            f3 = p3.syncFuture();
         }
         f1.get();
+        f2.get();
+        f3.get();
 
         long spend = (System.currentTimeMillis() - start) / 1000;
 
-        f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-        f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+        f1 = p1.syncFuture();
+        f2 = p2.syncFuture();
+        f3 = p3.syncFuture();
+        f4 = p4.syncFuture();
 
         stopGrid(0);
 
-        //TODO: refactor to get futures by topology
-        while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() ||
-            f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+        while (f1 == p1.syncFuture() || f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture())
             U.sleep(100);
 
-        ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
-        ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+        p1.syncFuture().get();
+        p2.syncFuture().get();
+        p3.syncFuture().get();
+        p4.syncFuture().get();
 
-        f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+        f2 = p2.syncFuture();
+        f3 = p3.syncFuture();
+        f4 = p4.syncFuture();
 
         stopGrid(1);
 
-        while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+        while (f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture())
             U.sleep(100);
 
-        ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+        p2.syncFuture().get();
+        p3.syncFuture().get();
+        p4.syncFuture().get();
 
-        checkData(grid(2));
+        f3 = p3.syncFuture();
+        f4 = p4.syncFuture();
+
+        stopGrid(2);
+
+        while (f3 == p3.syncFuture() || f4 == p4.syncFuture())
+            U.sleep(100);
+
+        p3.syncFuture().get();
+        p4.syncFuture().get();
+
+        stopGrid(3);
+
+        checkData(grid(4));
 
         log.info("Spend " + spend + " seconds to preload entries.");
 
@@ -198,7 +237,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
 
         startGrid(1);
         startGrid(2);
-        startGrid(3);
+        startGrid(10);
 
         Thread t = new Thread(new Runnable() {
             @Override public void run() {
@@ -214,10 +253,10 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
                         int size = 1000;
 
                         for (int i = 0; i < size; i++)
-                            grid(3).cachex(CACHE_NAME_DHT).remove(i);
+                            grid(10).cachex(CACHE_NAME_DHT).remove(i);
 
                         for (int i = 0; i < size; i++)
-                            grid(3).cachex(CACHE_NAME_DHT).put(i, i);
+                            grid(10).cachex(CACHE_NAME_DHT).put(i, i);
 
                         spend += System.currentTimeMillis() - start;
 
@@ -245,7 +284,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
         cancelled.set(true);
         t.join();
 
-        checkData(grid(3));
+        checkData(grid(10));
 
         //stopAllGrids();
     }