You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/11/04 15:11:19 UTC
[36/36] ignite git commit: IGNITE-426 Added test.
IGNITE-426 Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7236c3a1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7236c3a1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7236c3a1
Branch: refs/heads/ignite-462-2
Commit: 7236c3a14f99db7ec6b2fc6a5618cf905000fc12
Parents: c991859
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Nov 4 16:47:38 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Nov 4 17:02:58 2015 +0300
----------------------------------------------------------------------
...ContinuousQueryFailoverAbstractSelfTest.java | 100 +++++++++++++++++++
1 file changed, 100 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7236c3a1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 2c71bc2..dd4cf3e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -61,6 +61,7 @@ import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
@@ -319,6 +320,105 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
/**
* @throws Exception If failed.
*/
+ public void testUpdatePartitionCounter() throws Exception {
+ this.backups = 2;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ final Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ Map<Integer, Long> updateCntrs = new HashMap<>();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int killedNode = rnd.nextInt(SRV_NODES);
+
+ for (int i = 0; i < 20; i++) {
+ List<Integer> keys = testKeys(grid(0).cache(null), 10);
+
+ for (Integer key : keys) {
+ IgniteCache cache = null;
+
+ if (rnd.nextBoolean())
+ cache = qryClient.cache(null);
+ else {
+ for (int j = 0; j < 10; j++) {
+ int nodeIdx = rnd.nextInt(SRV_NODES);
+
+ if (killedNode != nodeIdx) {
+ cache = grid(nodeIdx).cache(null);
+
+ break;
+ }
+ }
+
+ if (cache == null)
+ throw new Exception("Failed to find a server node.");
+ }
+
+ cache.put(key, key);
+
+ int part = qryClient.affinity(null).partition(key);
+
+ Long cntr = updateCntrs.get(part);
+
+ if (cntr == null)
+ cntr = 0L;
+
+ updateCntrs.put(part, ++cntr);
+ }
+
+ checkPartCounter(SRV_NODES, killedNode, updateCntrs);
+
+ stopGrid(killedNode);
+
+ awaitPartitionMapExchange();
+
+ checkPartCounter(SRV_NODES, killedNode, updateCntrs);
+
+ startGrid(killedNode);
+
+ awaitPartitionMapExchange();
+
+ checkPartCounter(SRV_NODES, killedNode, updateCntrs);
+
+ killedNode = rnd.nextInt(SRV_NODES);
+ }
+ }
+
+ /**
+ * @param nodes Count nodes.
+ * @param killedNodeIdx Killed node index.
+ * @param updCntrs Update counters.
+ * @return {@code True} if counters matches.
+ */
+ private boolean checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long> updCntrs) {
+ for (int i = 0; i < nodes; i++) {
+ if (i == killedNodeIdx)
+ continue;
+
+ Affinity<Object> aff = grid(i).affinity(null);
+
+ Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters();
+
+ for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
+ if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode()))
+ assertEquals(e.getValue(), act.get(e.getKey()));
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testStartStopQuery() throws Exception {
this.backups = 1;