You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 13:13:34 UTC

[07/16] ignite git commit: 1093

1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d51f61c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d51f61c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d51f61c

Branch: refs/heads/ignite-1093-2
Commit: 7d51f61c29a96475e6662484a6dae474b3f8f609
Parents: ff0e2e1
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 12:18:43 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 12:18:43 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  2 +-
 .../dht/preloader/GridDhtPartitionDemander.java |  1 +
 .../dht/preloader/GridDhtPartitionSupplier.java |  2 +
 .../GridCacheRebalancingSyncSelfTest.java       | 48 +++++++++++---------
 4 files changed, 30 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index e77b540..92d9ab1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1084,7 +1084,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
      * @return {@code this} for chaining.
      */
     public CacheConfiguration<K, V> setRebalanceBatchSize(int rebalanceBatchSize) {
-        this.rebalanceBatchSize = rebalanceBatchSize;
+        this.rebalanceBatchSize = Math.max(1, rebalanceBatchSize);
 
         return this;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index fbe57dc..3c5a2f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -409,6 +409,7 @@ public class GridDhtPartitionDemander {
                         GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
                         initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
+
                         try {
                             if (!topologyChanged(topVer))
                                 cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout());

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index fb9f796..ee01158 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -509,6 +509,8 @@ class GridDhtPartitionSupplier {
                 }
             }
 
+            scMap.remove(scId);
+
             reply(node, d, s);
         }
         catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 3366381..39f5d4b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -95,6 +95,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         cachePCfg.setCacheMode(CacheMode.PARTITIONED);
         cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         cachePCfg.setBackups(1);
+        cachePCfg.setRebalanceBatchSize(1);
+        cachePCfg.setRebalanceBatchesCount(1);
 
         CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>();
 
@@ -108,6 +110,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED);
         cacheRCfg.setCacheMode(CacheMode.REPLICATED);
         cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cachePCfg.setRebalanceBatchSize(1);
+        cachePCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
 
         CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>();
 
@@ -123,19 +127,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     /**
      * @param ignite Ignite.
      */
-    protected void generateData(Ignite ignite) {
-        generateData(ignite, CACHE_NAME_DHT_PARTITIONED);
-        generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
-        generateData(ignite, CACHE_NAME_DHT_REPLICATED);
-        generateData(ignite, CACHE_NAME_DHT_REPLICATED_2);
+    protected void generateData(Ignite ignite, int from) {
+        generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from);
+        generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from);
+        generateData(ignite, CACHE_NAME_DHT_REPLICATED, from);
+        generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from);
     }
 
     /**
      * @param ignite Ignite.
      */
-    protected void generateData(Ignite ignite, String name) {
+    protected void generateData(Ignite ignite, String name, int from) {
         try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(name)) {
-            for (int i = 0; i < TEST_SIZE; i++) {
+            for (int i = from; i < from + TEST_SIZE; i++) {
                 if (i % (TEST_SIZE / 10) == 0)
                     log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
 
@@ -150,11 +154,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
      * @param ignite Ignite.
      * @throws IgniteCheckedException Exception.
      */
-    protected void checkData(Ignite ignite) throws IgniteCheckedException {
-        checkData(ignite, CACHE_NAME_DHT_PARTITIONED);
-        checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
-        checkData(ignite, CACHE_NAME_DHT_REPLICATED);
-        checkData(ignite, CACHE_NAME_DHT_REPLICATED_2);
+    protected void checkData(Ignite ignite, int from) throws IgniteCheckedException {
+        checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from);
+        checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from);
+        checkData(ignite, CACHE_NAME_DHT_REPLICATED, from);
+        checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from);
     }
 
     /**
@@ -162,13 +166,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
      * @param name Cache name.
      * @throws IgniteCheckedException Exception.
      */
-    protected void checkData(Ignite ignite, String name) throws IgniteCheckedException {
-        for (int i = 0; i < TEST_SIZE; i++) {
+    protected void checkData(Ignite ignite, String name, int from) throws IgniteCheckedException {
+        for (int i = from; i < from + TEST_SIZE; i++) {
             if (i % (TEST_SIZE / 10) == 0)
                 log.info("Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
 
             assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode()) :
-                "value " + i + name.hashCode() + " does not match (" + ignite.cache(name).get(i) + ")";
+                "value " + (i + name.hashCode()) + " does not match (" + ignite.cache(name).get(i) + ")";
         }
     }
 
@@ -178,7 +182,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     public void testSimpleRebalancing() throws Exception {
         Ignite ignite = startGrid(0);
 
-        generateData(ignite);
+        generateData(ignite, 0);
 
         log.info("Preloading started.");
 
@@ -204,7 +208,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         long spend = (System.currentTimeMillis() - start) / 1000;
 
-        checkData(grid(1));
+        checkData(grid(1), 0);
 
         log.info("Spend " + spend + " seconds to rebalance entries.");
 
@@ -260,7 +264,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     public void testComplexRebalancing() throws Exception {
         Ignite ignite = startGrid(0);
 
-        generateData(ignite);
+        generateData(ignite, 0);
 
         log.info("Preloading started.");
 
@@ -351,7 +355,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         long spend = (System.currentTimeMillis() - start) / 1000;
 
-        checkData(grid(4));
+        checkData(grid(4), 0);
 
         log.info("Spend " + spend + " seconds to rebalance entries.");
 
@@ -370,7 +374,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         ((TcpDiscoveryNode)ignite.cluster().localNode()).setAttributes(map);
 
-        generateData(ignite);
+        generateData(ignite, 0);
 
         startGrid(1);
 
@@ -378,7 +382,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         stopGrid(0);
 
-        checkData(grid(1));
+        checkData(grid(1), 0);
 
         stopAllGrids();
     }
@@ -389,7 +393,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     public void testNodeFailedAtRebalancing() throws Exception {
         Ignite ignite = startGrid(0);
 
-        generateData(ignite);
+        generateData(ignite, 0);
 
         log.info("Preloading started.");