You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 18:59:59 UTC

[3/4] ignite git commit: 1093

1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c45d2af4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c45d2af4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c45d2af4

Branch: refs/heads/ignite-1093-2
Commit: c45d2af46e2a80f1c82724807a834f1afecc2be7
Parents: 5ce1dd0
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Sep 15 18:24:02 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Sep 15 18:24:02 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java |  2 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |  2 +-
 .../GridCacheRebalancingSyncSelfTest.java       | 67 ++++++++++++++++----
 3 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c45d2af4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 9960435..87a1a6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -193,7 +193,7 @@ public class GridDhtPartitionDemander {
      * @return {@code True} if topology changed.
      */
     private boolean topologyChanged(AffinityTopologyVersion topVer) {
-        return cctx.affinity().affinityTopologyVersion().topologyVersion() != topVer.topologyVersion();
+        return !cctx.affinity().affinityTopologyVersion().equals(topVer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c45d2af4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 49e89ca..0686376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -104,7 +104,7 @@ class GridDhtPartitionSupplier {
         assert d != null;
         assert id != null;
 
-        if (cctx.affinity().affinityTopologyVersion().topologyVersion() != d.topologyVersion().topologyVersion())
+        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
             return;
 
         GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/c45d2af4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 07c3e7c..db0c8ba 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -117,6 +118,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         return iCfg;
     }
 
+    /**
+     * @param ignite Ignite.
+     */
     protected void generateData(Ignite ignite) {
         generateData(ignite, CACHE_NAME_DHT_PARTITIONED);
         generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
@@ -140,6 +144,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     * @param ignite Ignite.
+     * @throws IgniteCheckedException Exception.
+     */
     protected void checkData(Ignite ignite) throws IgniteCheckedException {
         checkData(ignite, CACHE_NAME_DHT_PARTITIONED);
         checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
@@ -149,7 +157,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
     /**
      * @param ignite Ignite.
-     * @throws IgniteCheckedException
+     * @param name Cache name.
+     * @throws IgniteCheckedException Exception.
      */
     protected void checkData(Ignite ignite, String name) throws IgniteCheckedException {
         for (int i = 0; i < TEST_SIZE; i++) {
@@ -162,7 +171,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception Exception
      */
     public void testSimpleRebalancing() throws Exception {
         Ignite ignite = startGrid(0);
@@ -189,10 +198,30 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param id Id.
-     * @param top Topology.
+     * @param id Node id.
+     * @param major Major ver.
+     * @param minor Minor ver.
+     * @throws IgniteCheckedException Exception.
+     */
+    protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException {
+        waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
+    }
+
+    /**
+     * @param id Node id.
+     * @param major Major ver.
+     * @throws IgniteCheckedException Exception.
      */
-    protected void waitForRebalancing(int id, int top) throws IgniteCheckedException {
+    protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
+        waitForRebalancing(id, new AffinityTopologyVersion(major));
+    }
+
+    /**
+     * @param id Node id.
+     * @param top Topology version.
+     * @throws IgniteCheckedException
+     */
+    protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException {
         boolean finished = false;
 
         while (!finished) {
@@ -200,7 +229,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
             for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
                 GridDhtPartitionDemander.SyncFuture fut = (GridDhtPartitionDemander.SyncFuture)c.preloader().syncFuture();
-                if (fut.topologyVersion().topologyVersion() != top) {
+                if (fut.topologyVersion() == null || !fut.topologyVersion().equals(top)) {
                     finished = false;
 
                     break;
@@ -229,6 +258,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
                     startGrid(1);
                     startGrid(2);
 
+                    while (!concurrentStartFinished2) {
+                        U.sleep(10);
+                    }
+
+                    //New cache should start rebalancing.
+                    CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+                    cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW");
+                    cacheRCfg.setCacheMode(CacheMode.PARTITIONED);
+                    cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+                    grid(0).getOrCreateCache(cacheRCfg);
+
                     concurrentStartFinished = true;
                 }
                 catch (Exception e) {
@@ -256,10 +298,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         }
 
         //wait until cache rebalanced in async mode
-        waitForRebalancing(1, 5);
-        waitForRebalancing(2, 5);
-        waitForRebalancing(3, 5);
-        waitForRebalancing(4, 5);
+
+        waitForRebalancing(1, 5, 1);
+        waitForRebalancing(2, 5, 1);
+        waitForRebalancing(3, 5, 1);
+        waitForRebalancing(4, 5, 1);
 
         //cache rebalanced in async node
 
@@ -302,7 +345,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception Exception.
      */
     public void testBackwardCompatibility() throws Exception {
         Ignite ignite = startGrid(0);
@@ -327,7 +370,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception Exception.
      */
     public void testNodeFailedAtRebalancing() throws Exception {
         Ignite ignite = startGrid(0);