You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/17 13:31:24 UTC
incubator-ignite git commit: ignite-1093
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1093 64319443a -> 9da4b9367
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9da4b936
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9da4b936
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9da4b936
Branch: refs/heads/ignite-1093
Commit: 9da4b936795813b1786a7c0c5ed2abe87f504e09
Parents: 6431944
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Mon Aug 17 12:38:11 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Mon Aug 17 12:38:11 2015 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 13 +++-
...GridCacheMassiveRebalancingSyncSelfTest.java | 81 +++++++++++++++-----
2 files changed, 72 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9da4b936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 262ccb7..6d024de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -255,6 +255,14 @@ public class GridDhtPartitionDemander {
}
/**
+ * @param type Type.
+ * @param discoEvt Discovery event.
+ */
+ private void preloadEvent(int type, DiscoveryEvent discoEvt) {
+ preloadEvent(-1, type, discoEvt);
+ }
+
+ /**
* @param part Partition.
* @param type Type.
* @param discoEvt Discovery event.
@@ -796,7 +804,10 @@ public class GridDhtPartitionDemander {
missed.clear();
- cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary?
+ cctx.shared().exchange().scheduleResendPartitions();
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && !cctx.isReplicated())
+ preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
onDone(cancelled);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9da4b936/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index cd12954..91352ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -57,7 +58,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
- if (getTestGridName(3).equals(gridName))
+ if (getTestGridName(10).equals(gridName))
iCfg.setClientMode(true);
cacheCfg.setName(CACHE_NAME_DHT);
@@ -141,45 +142,83 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
long start = System.currentTimeMillis();
+ //will be started simultaneously in case of ASYNC mode
startGrid(1);
startGrid(2);
+ startGrid(3);
+ startGrid(4);
- IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
- f2.get();
+ GridCachePreloader p1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+ GridCachePreloader p2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+ GridCachePreloader p3 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+ GridCachePreloader p4 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
- IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
- while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion())) {
+ IgniteInternalFuture f4 = p4.syncFuture();
+ f4.get();
+
+ AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f4).topologyVersion();
+
+ IgniteInternalFuture f1 = p1.syncFuture();
+ IgniteInternalFuture f2 = p2.syncFuture();
+ IgniteInternalFuture f3 = p3.syncFuture();
+
+ while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(f4Top) ||
+ !((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion().equals(f4Top) ||
+ !((GridDhtPartitionDemander.SyncFuture)f3).topologyVersion().equals(f4Top)) {
U.sleep(100);
- f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+ f1 = p1.syncFuture();
+ f2 = p2.syncFuture();
+ f3 = p3.syncFuture();
}
f1.get();
+ f2.get();
+ f3.get();
long spend = (System.currentTimeMillis() - start) / 1000;
- f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
- f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+ f1 = p1.syncFuture();
+ f2 = p2.syncFuture();
+ f3 = p3.syncFuture();
+ f4 = p4.syncFuture();
stopGrid(0);
- //TODO: refactor to get futures by topology
- while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() ||
- f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+ while (f1 == p1.syncFuture() || f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture())
U.sleep(100);
- ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
- ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+ p1.syncFuture().get();
+ p2.syncFuture().get();
+ p3.syncFuture().get();
+ p4.syncFuture().get();
- f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+ f2 = p2.syncFuture();
+ f3 = p3.syncFuture();
+ f4 = p4.syncFuture();
stopGrid(1);
- while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+ while (f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture())
U.sleep(100);
- ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+ p2.syncFuture().get();
+ p3.syncFuture().get();
+ p4.syncFuture().get();
- checkData(grid(2));
+ f3 = p3.syncFuture();
+ f4 = p4.syncFuture();
+
+ stopGrid(2);
+
+ while (f3 == p3.syncFuture() || f4 == p4.syncFuture())
+ U.sleep(100);
+
+ p3.syncFuture().get();
+ p4.syncFuture().get();
+
+ stopGrid(3);
+
+ checkData(grid(4));
log.info("Spend " + spend + " seconds to preload entries.");
@@ -198,7 +237,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
startGrid(1);
startGrid(2);
- startGrid(3);
+ startGrid(10);
Thread t = new Thread(new Runnable() {
@Override public void run() {
@@ -214,10 +253,10 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
int size = 1000;
for (int i = 0; i < size; i++)
- grid(3).cachex(CACHE_NAME_DHT).remove(i);
+ grid(10).cachex(CACHE_NAME_DHT).remove(i);
for (int i = 0; i < size; i++)
- grid(3).cachex(CACHE_NAME_DHT).put(i, i);
+ grid(10).cachex(CACHE_NAME_DHT).put(i, i);
spend += System.currentTimeMillis() - start;
@@ -245,7 +284,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
cancelled.set(true);
t.join();
- checkData(grid(3));
+ checkData(grid(10));
//stopAllGrids();
}