You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/17 04:04:21 UTC

[15/55] [abbrv] ignite git commit: IGNITE-1239 - Added test for reopened ticket.

IGNITE-1239 - Added test for reopened ticket.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/866fb415
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/866fb415
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/866fb415

Branch: refs/heads/ignite-1171
Commit: 866fb41525957555231fca11c5853731b9473170
Parents: 06fdd7d
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Sep 14 16:09:37 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Sep 14 16:09:37 2015 -0700

----------------------------------------------------------------------
 ...CacheScanPartitionQueryFallbackSelfTest.java | 105 ++++++++++++++++++-
 1 file changed, 104 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/866fb415/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index cb3a3bf..df310b4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -26,13 +26,19 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -48,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -67,7 +74,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     private static final int GRID_CNT = 3;
 
     /** Keys count. */
-    private static final int KEYS_CNT = 5000;
+    private static final int KEYS_CNT = 50 * RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
 
     /** Ip finder. */
     private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
@@ -261,6 +268,79 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     }
 
     /**
+     * Scan should activate fallback mechanism when new nodes join topology and rebalancing happens in parallel with
+     * scan query.
+     *
+     * @throws Exception In case of error.
+     */
+    public void testScanFallbackOnRebalancingCursor() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-1239");
+
+        cacheMode = CacheMode.PARTITIONED;
+        clientMode = false;
+        backups = 1;
+        commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory();
+
+        try {
+            Ignite ignite = startGrids(GRID_CNT);
+
+            fillCache(ignite);
+
+            final AtomicBoolean done = new AtomicBoolean(false);
+
+            IgniteInternalFuture fut1 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        for (int i = 0; i < 5; i++) {
+                            startGrid(GRID_CNT + i);
+
+                            U.sleep(500);
+                        }
+
+                        done.set(true);
+
+                        return null;
+                    }
+                }, 1);
+
+            final AtomicInteger nodeIdx = new AtomicInteger();
+
+            IgniteInternalFuture fut2 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        int nodeId = nodeIdx.getAndIncrement();
+
+                        IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
+
+                        while (!done.get()) {
+                            int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
+
+                            try {
+                                QueryCursor<Cache.Entry<Integer, Integer>> cur =
+                                    cache.query(new ScanQuery<Integer, Integer>(part));
+
+                                U.debug(log, "Running query [node=" + nodeId + ", part=" + part + ']');
+
+                                doTestScanQueryCursor(cur, part);
+                            }
+                            catch (ClusterGroupEmptyCheckedException e) {
+                                log.warning("Invalid partition: " + part, e);
+                            }
+                        }
+
+                        return null;
+                    }
+                }, GRID_CNT);
+
+            fut1.get();
+            fut2.get();
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      * Scan should try first remote node and fallbacks to second remote node.
      *
      * @throws Exception If failed.
@@ -391,6 +471,29 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     }
 
     /**
+     * @param cur Query cursor.
+     * @param part Partition number.
+     */
+    protected void doTestScanQueryCursor(
+        QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) throws IgniteCheckedException {
+
+        Map<Integer, Integer> map = entries.get(part);
+
+        assert map != null;
+
+        int cnt = 0;
+
+        for (Cache.Entry<Integer, Integer> e : cur) {
+
+            assertEquals(map.get(e.getKey()), e.getValue());
+
+            cnt++;
+        }
+
+        assertEquals("Invalid number of entries for partition: " + part, map.size(), cnt);
+    }
+
+    /**
      * @param cctx Cctx.
      */
     private static int anyLocalPartition(GridCacheContext<?, ?> cctx) {