You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/11/04 15:11:19 UTC

[36/36] ignite git commit: IGNITE-426 Added test.

IGNITE-426 Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7236c3a1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7236c3a1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7236c3a1

Branch: refs/heads/ignite-462-2
Commit: 7236c3a14f99db7ec6b2fc6a5618cf905000fc12
Parents: c991859
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Nov 4 16:47:38 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Nov 4 17:02:58 2015 +0300

----------------------------------------------------------------------
 ...ContinuousQueryFailoverAbstractSelfTest.java | 100 +++++++++++++++++++
 1 file changed, 100 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7236c3a1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 2c71bc2..dd4cf3e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -61,6 +61,7 @@ import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
@@ -319,6 +320,105 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
     /**
      * @throws Exception If failed.
      */
+    public void testUpdatePartitionCounter() throws Exception {
+        this.backups = 2;
+
+        final int SRV_NODES = 4;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        final Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        Map<Integer, Long> updateCntrs = new HashMap<>();
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        int killedNode = rnd.nextInt(SRV_NODES);
+
+        for (int i = 0; i < 20; i++) {
+            List<Integer> keys = testKeys(grid(0).cache(null), 10);
+
+            for (Integer key : keys) {
+                IgniteCache cache = null;
+
+                if (rnd.nextBoolean())
+                    cache = qryClient.cache(null);
+                else {
+                    for (int j = 0; j < 10; j++) {
+                        int nodeIdx = rnd.nextInt(SRV_NODES);
+
+                        if (killedNode != nodeIdx) {
+                            cache = grid(nodeIdx).cache(null);
+
+                            break;
+                        }
+                    }
+
+                    if (cache == null)
+                        throw new Exception("Failed to find a server node.");
+                }
+
+                cache.put(key, key);
+
+                int part = qryClient.affinity(null).partition(key);
+
+                Long cntr = updateCntrs.get(part);
+
+                if (cntr == null)
+                    cntr = 0L;
+
+                updateCntrs.put(part, ++cntr);
+            }
+
+            checkPartCounter(SRV_NODES, killedNode, updateCntrs);
+
+            stopGrid(killedNode);
+
+            awaitPartitionMapExchange();
+
+            checkPartCounter(SRV_NODES, killedNode, updateCntrs);
+
+            startGrid(killedNode);
+
+            awaitPartitionMapExchange();
+
+            checkPartCounter(SRV_NODES, killedNode, updateCntrs);
+
+            killedNode = rnd.nextInt(SRV_NODES);
+        }
+    }
+
+    /**
+     * @param nodes Count nodes.
+     * @param killedNodeIdx Killed node index.
+     * @param updCntrs Update counters.
+     * @return {@code True} if counters matches.
+     */
+    private boolean checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long> updCntrs) {
+        for (int i = 0; i < nodes; i++) {
+            if (i == killedNodeIdx)
+                continue;
+
+            Affinity<Object> aff = grid(i).affinity(null);
+
+            Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters();
+
+            for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
+                if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode()))
+                    assertEquals(e.getValue(), act.get(e.getKey()));
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testStartStopQuery() throws Exception {
         this.backups = 1;