You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 13:13:34 UTC
[07/16] ignite git commit: 1093
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d51f61c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d51f61c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d51f61c
Branch: refs/heads/ignite-1093-2
Commit: 7d51f61c29a96475e6662484a6dae474b3f8f609
Parents: ff0e2e1
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Sep 18 12:18:43 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Sep 18 12:18:43 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 2 +-
.../dht/preloader/GridDhtPartitionDemander.java | 1 +
.../dht/preloader/GridDhtPartitionSupplier.java | 2 +
.../GridCacheRebalancingSyncSelfTest.java | 48 +++++++++++---------
4 files changed, 30 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index e77b540..92d9ab1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1084,7 +1084,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
* @return {@code this} for chaining.
*/
public CacheConfiguration<K, V> setRebalanceBatchSize(int rebalanceBatchSize) {
- this.rebalanceBatchSize = rebalanceBatchSize;
+ this.rebalanceBatchSize = Math.max(1, rebalanceBatchSize);
return this;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index fbe57dc..3c5a2f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -409,6 +409,7 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
+
try {
if (!topologyChanged(topVer))
cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index fb9f796..ee01158 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -509,6 +509,8 @@ class GridDhtPartitionSupplier {
}
}
+ scMap.remove(scId);
+
reply(node, d, s);
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 3366381..39f5d4b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -95,6 +95,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
cachePCfg.setCacheMode(CacheMode.PARTITIONED);
cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
cachePCfg.setBackups(1);
+ cachePCfg.setRebalanceBatchSize(1);
+ cachePCfg.setRebalanceBatchesCount(1);
CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>();
@@ -108,6 +110,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED);
cacheRCfg.setCacheMode(CacheMode.REPLICATED);
cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cachePCfg.setRebalanceBatchSize(1);
+ cachePCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>();
@@ -123,19 +127,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
* @param ignite Ignite.
*/
- protected void generateData(Ignite ignite) {
- generateData(ignite, CACHE_NAME_DHT_PARTITIONED);
- generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
- generateData(ignite, CACHE_NAME_DHT_REPLICATED);
- generateData(ignite, CACHE_NAME_DHT_REPLICATED_2);
+ protected void generateData(Ignite ignite, int from) {
+ generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from);
+ generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from);
+ generateData(ignite, CACHE_NAME_DHT_REPLICATED, from);
+ generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from);
}
/**
* @param ignite Ignite.
*/
- protected void generateData(Ignite ignite, String name) {
+ protected void generateData(Ignite ignite, String name, int from) {
try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(name)) {
- for (int i = 0; i < TEST_SIZE; i++) {
+ for (int i = from; i < from + TEST_SIZE; i++) {
if (i % (TEST_SIZE / 10) == 0)
log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
@@ -150,11 +154,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
* @param ignite Ignite.
* @throws IgniteCheckedException Exception.
*/
- protected void checkData(Ignite ignite) throws IgniteCheckedException {
- checkData(ignite, CACHE_NAME_DHT_PARTITIONED);
- checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2);
- checkData(ignite, CACHE_NAME_DHT_REPLICATED);
- checkData(ignite, CACHE_NAME_DHT_REPLICATED_2);
+ protected void checkData(Ignite ignite, int from) throws IgniteCheckedException {
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from);
+ checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from);
+ checkData(ignite, CACHE_NAME_DHT_REPLICATED, from);
+ checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from);
}
/**
@@ -162,13 +166,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
* @param name Cache name.
* @throws IgniteCheckedException Exception.
*/
- protected void checkData(Ignite ignite, String name) throws IgniteCheckedException {
- for (int i = 0; i < TEST_SIZE; i++) {
+ protected void checkData(Ignite ignite, String name, int from) throws IgniteCheckedException {
+ for (int i = from; i < from + TEST_SIZE; i++) {
if (i % (TEST_SIZE / 10) == 0)
log.info("Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode()) :
- "value " + i + name.hashCode() + " does not match (" + ignite.cache(name).get(i) + ")";
+ "value " + (i + name.hashCode()) + " does not match (" + ignite.cache(name).get(i) + ")";
}
}
@@ -178,7 +182,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
public void testSimpleRebalancing() throws Exception {
Ignite ignite = startGrid(0);
- generateData(ignite);
+ generateData(ignite, 0);
log.info("Preloading started.");
@@ -204,7 +208,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
long spend = (System.currentTimeMillis() - start) / 1000;
- checkData(grid(1));
+ checkData(grid(1), 0);
log.info("Spend " + spend + " seconds to rebalance entries.");
@@ -260,7 +264,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
public void testComplexRebalancing() throws Exception {
Ignite ignite = startGrid(0);
- generateData(ignite);
+ generateData(ignite, 0);
log.info("Preloading started.");
@@ -351,7 +355,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
long spend = (System.currentTimeMillis() - start) / 1000;
- checkData(grid(4));
+ checkData(grid(4), 0);
log.info("Spend " + spend + " seconds to rebalance entries.");
@@ -370,7 +374,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
((TcpDiscoveryNode)ignite.cluster().localNode()).setAttributes(map);
- generateData(ignite);
+ generateData(ignite, 0);
startGrid(1);
@@ -378,7 +382,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
stopGrid(0);
- checkData(grid(1));
+ checkData(grid(1), 0);
stopAllGrids();
}
@@ -389,7 +393,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
public void testNodeFailedAtRebalancing() throws Exception {
Ignite ignite = startGrid(0);
- generateData(ignite);
+ generateData(ignite, 0);
log.info("Preloading started.");