You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/21 15:36:18 UTC
[1/2] incubator-ignite git commit: ignite-1093
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1093 a483f5220 -> dc10b85cc
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a5bd80de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a5bd80de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a5bd80de
Branch: refs/heads/ignite-1093
Commit: a5bd80ded53be77ff568f5c9edbbeddea91d1aff
Parents: a483f52
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Fri Aug 21 15:08:39 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Fri Aug 21 15:08:39 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 5 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 7 +-
...GridCacheMassiveRebalancingSyncSelfTest.java | 202 ++++++++++++++-----
3 files changed, 157 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index e11addc..0c30630 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -291,7 +291,8 @@ public class GridDhtPartitionDemander {
AffinityTopologyVersion topVer = assigns.topologyVersion();
if (syncFut.isInited()) {
- syncFut.get();
+ if (!syncFut.isDone())
+ syncFut.onCancel();
syncFut = new SyncFuture(assigns);
}
@@ -791,7 +792,7 @@ public class GridDhtPartitionDemander {
Collection<Integer> parts = remaining.get(nodeId);
- if (parts!=null) {
+ if (parts != null) {
parts.remove(p);
if (parts.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index c496f8d..546e67b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -81,15 +81,13 @@ class GridDhtPartitionSupplier {
if (!cctx.kernalContext().clientNode()) {
for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
- final int idx = cnt;
-
cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
@Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
if (!enterBusy())
return;
try {
- processMessage(m, id, idx);
+ processMessage(m, id);
}
finally {
leaveBusy();
@@ -161,9 +159,8 @@ class GridDhtPartitionSupplier {
/**
* @param d Demand message.
* @param id Node uuid.
- * @param idx Index.
*/
- private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) {
+ private void processMessage(GridDhtPartitionDemandMessage d, UUID id) {
assert d != null;
assert id != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index cc82e79..d92ec86 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -39,11 +39,14 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
/** */
protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
- private static int TEST_SIZE = 1_024_000;
+ private static int TEST_SIZE = 1_120_000;
/** cache name. */
protected static String CACHE_NAME_DHT = "cache";
+ /** cache 2 name. */
+ protected static String CACHE_2_NAME_DHT = "cache2";
+
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return Long.MAX_VALUE;
@@ -53,24 +56,33 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration iCfg = super.getConfiguration(gridName);
- CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
-
((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
if (getTestGridName(10).equals(gridName))
iCfg.setClientMode(true);
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
cacheCfg.setName(CACHE_NAME_DHT);
cacheCfg.setCacheMode(CacheMode.PARTITIONED);
//cacheCfg.setRebalanceBatchSize(1024);
//cacheCfg.setRebalanceBatchesCount(1);
cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
- cacheCfg.setRebalanceThreadPoolSize(4);
- //cacheCfg.setRebalanceTimeout(1000000);
+ cacheCfg.setRebalanceThreadPoolSize(8);
cacheCfg.setBackups(1);
- iCfg.setCacheConfiguration(cacheCfg);
+ CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>();
+
+ cacheCfg2.setName(CACHE_2_NAME_DHT);
+ cacheCfg2.setCacheMode(CacheMode.PARTITIONED);
+ //cacheCfg2.setRebalanceBatchSize(1024);
+ //cacheCfg2.setRebalanceBatchesCount(1);
+ cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheCfg2.setRebalanceThreadPoolSize(8);
+ cacheCfg2.setBackups(1);
+
+ iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);
return iCfg;
}
@@ -86,6 +98,14 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
stmr.addData(i, i);
}
}
+ try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+ stmr.addData(i, i + 3);
+ }
+ }
}
/**
@@ -97,7 +117,15 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
if (i % 1_000_000 == 0)
log.info("Checked " + i / 1_000_000 + "m entries.");
- assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
+ assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) :
+ "keys " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")";
+ }
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Checked " + i / 1_000_000 + "m entries.");
+
+ assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) :
+ "keys " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")";
}
}
@@ -125,7 +153,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
checkData(grid(1));
- log.info("Spend " + spend + " seconds to preload entries.");
+ log.info("Spend " + spend + " seconds to rebalance entries.");
stopAllGrids();
}
@@ -148,79 +176,153 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
startGrid(3);
startGrid(4);
- GridCachePreloader p1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
- GridCachePreloader p2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
- GridCachePreloader p3 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
- GridCachePreloader p4 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+ //wait until cache rebalanced in async mode
- IgniteInternalFuture f4 = p4.syncFuture();
- f4.get();
+ GridCachePreloader p11 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+ GridCachePreloader p12 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+ GridCachePreloader p13 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+ GridCachePreloader p14 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
- AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f4).topologyVersion();
+ GridCachePreloader p21 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
+ GridCachePreloader p22 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
+ GridCachePreloader p23 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
+ GridCachePreloader p24 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
- IgniteInternalFuture f1 = p1.syncFuture();
- IgniteInternalFuture f2 = p2.syncFuture();
- IgniteInternalFuture f3 = p3.syncFuture();
+ IgniteInternalFuture f24 = p24.syncFuture();
+ f24.get();
- while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(f4Top) ||
- !((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion().equals(f4Top) ||
- !((GridDhtPartitionDemander.SyncFuture)f3).topologyVersion().equals(f4Top)) {
+ IgniteInternalFuture f14 = p14.syncFuture();
+ f14.get();
+
+ AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f24).topologyVersion();
+
+ IgniteInternalFuture f11 = p11.syncFuture();
+ IgniteInternalFuture f12 = p12.syncFuture();
+ IgniteInternalFuture f13 = p13.syncFuture();
+
+ while (!((GridDhtPartitionDemander.SyncFuture)f11).topologyVersion().equals(f4Top) ||
+ !((GridDhtPartitionDemander.SyncFuture)f12).topologyVersion().equals(f4Top) ||
+ !((GridDhtPartitionDemander.SyncFuture)f13).topologyVersion().equals(f4Top)) {
U.sleep(100);
- f1 = p1.syncFuture();
- f2 = p2.syncFuture();
- f3 = p3.syncFuture();
+ f11 = p11.syncFuture();
+ f12 = p12.syncFuture();
+ f13 = p13.syncFuture();
}
- f1.get();
- f2.get();
- f3.get();
+ f11.get();
+ f12.get();
+ f13.get();
- long spend = (System.currentTimeMillis() - start) / 1000;
+ IgniteInternalFuture f21 = p21.syncFuture();
+ IgniteInternalFuture f22 = p22.syncFuture();
+ IgniteInternalFuture f23 = p23.syncFuture();
+
+ while (!((GridDhtPartitionDemander.SyncFuture)f21).topologyVersion().equals(f4Top) ||
+ !((GridDhtPartitionDemander.SyncFuture)f22).topologyVersion().equals(f4Top) ||
+ !((GridDhtPartitionDemander.SyncFuture)f23).topologyVersion().equals(f4Top)) {
+ U.sleep(100);
+
+ f21 = p21.syncFuture();
+ f22 = p22.syncFuture();
+ f23 = p23.syncFuture();
+ }
+ f21.get();
+ f22.get();
+ f23.get();
- f1 = p1.syncFuture();
- f2 = p2.syncFuture();
- f3 = p3.syncFuture();
- f4 = p4.syncFuture();
+ //cache rebalanced in async node
+
+ f11 = p11.syncFuture();
+ f12 = p12.syncFuture();
+ f13 = p13.syncFuture();
+ f14 = p14.syncFuture();
+
+ f21 = p21.syncFuture();
+ f22 = p22.syncFuture();
+ f23 = p23.syncFuture();
+ f24 = p24.syncFuture();
stopGrid(0);
- while (f1 == p1.syncFuture() || f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture())
+ //wait until cache rebalanced
+
+ while (f11 == p11.syncFuture() || f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture())
U.sleep(100);
- p1.syncFuture().get();
- p2.syncFuture().get();
- p3.syncFuture().get();
- p4.syncFuture().get();
+ while (f21 == p21.syncFuture() || f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture())
+ U.sleep(100);
+
+ p11.syncFuture().get();
+ p12.syncFuture().get();
+ p13.syncFuture().get();
+ p14.syncFuture().get();
+
+ p21.syncFuture().get();
+ p22.syncFuture().get();
+ p23.syncFuture().get();
+ p24.syncFuture().get();
- f2 = p2.syncFuture();
- f3 = p3.syncFuture();
- f4 = p4.syncFuture();
+ //cache rebalanced
+
+ f12 = p12.syncFuture();
+ f13 = p13.syncFuture();
+ f14 = p14.syncFuture();
+
+ f22 = p22.syncFuture();
+ f23 = p23.syncFuture();
+ f24 = p24.syncFuture();
stopGrid(1);
- while (f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture())
+ //wait until cache rebalanced
+
+ while (f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture())
U.sleep(100);
- p2.syncFuture().get();
- p3.syncFuture().get();
- p4.syncFuture().get();
+ while (f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture())
+ U.sleep(100);
+
+ p12.syncFuture().get();
+ p13.syncFuture().get();
+ p14.syncFuture().get();
+
+ p22.syncFuture().get();
+ p23.syncFuture().get();
+ p24.syncFuture().get();
- f3 = p3.syncFuture();
- f4 = p4.syncFuture();
+ //cache rebalanced
+
+ f13 = p13.syncFuture();
+ f14 = p14.syncFuture();
+
+ f23 = p23.syncFuture();
+ f24 = p24.syncFuture();
stopGrid(2);
- while (f3 == p3.syncFuture() || f4 == p4.syncFuture())
+ //wait until cache rebalanced
+
+ while (f13 == p13.syncFuture() || f14 == p14.syncFuture())
U.sleep(100);
- p3.syncFuture().get();
- p4.syncFuture().get();
+ while (f23 == p23.syncFuture() || f24 == p24.syncFuture())
+ U.sleep(100);
+
+ p13.syncFuture().get();
+ p14.syncFuture().get();
+
+ p23.syncFuture().get();
+ p24.syncFuture().get();
+
+ //cache rebalanced
stopGrid(3);
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
checkData(grid(4));
- log.info("Spend " + spend + " seconds to preload entries.");
+ log.info("Spend " + spend + " seconds to rebalance entries.");
stopAllGrids();
}
[2/2] incubator-ignite git commit: ignite-1093
Posted by av...@apache.org.
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dc10b85c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dc10b85c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dc10b85c
Branch: refs/heads/ignite-1093
Commit: dc10b85ccc83b1f2999685a10b1997e3e4f5d18b
Parents: a5bd80d
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Fri Aug 21 16:35:49 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Fri Aug 21 16:35:49 2015 +0300
----------------------------------------------------------------------
.../distributed/dht/preloader/GridDhtPartitionDemander.java | 5 ++++-
.../rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java | 4 ++--
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc10b85c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0c30630..126bde9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -339,7 +339,10 @@ public class GridDhtPartitionDemander {
syncFut.append(node.id(), remainings);
- int lsnrCnt = cctx.config().getRebalanceThreadPoolSize();
+ int lsnrCnt = cctx.config().getRebalanceThreadPoolSize() / assigns.entrySet().size();
+
+ if (lsnrCnt == 0)
+ lsnrCnt = 1;//At least one listener per node.
List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc10b85c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index d92ec86..1182254 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -69,7 +69,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
//cacheCfg.setRebalanceBatchSize(1024);
//cacheCfg.setRebalanceBatchesCount(1);
cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
- cacheCfg.setRebalanceThreadPoolSize(8);
+ //cacheCfg.setRebalanceThreadPoolSize(8);
cacheCfg.setBackups(1);
CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>();
@@ -79,7 +79,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
//cacheCfg2.setRebalanceBatchSize(1024);
//cacheCfg2.setRebalanceBatchesCount(1);
cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
- cacheCfg2.setRebalanceThreadPoolSize(8);
+ //cacheCfg2.setRebalanceThreadPoolSize(8);
cacheCfg2.setBackups(1);
iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);