You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/15 01:09:45 UTC
ignite git commit: IGNITE-1239 - Added test for reopened ticket.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.4 06fdd7d44 -> 866fb4152
IGNITE-1239 - Added test for reopened ticket.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/866fb415
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/866fb415
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/866fb415
Branch: refs/heads/ignite-1.4
Commit: 866fb41525957555231fca11c5853731b9473170
Parents: 06fdd7d
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Sep 14 16:09:37 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Sep 14 16:09:37 2015 -0700
----------------------------------------------------------------------
...CacheScanPartitionQueryFallbackSelfTest.java | 105 ++++++++++++++++++-
1 file changed, 104 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/866fb415/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index cb3a3bf..df310b4 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -26,13 +26,19 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -48,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
@@ -67,7 +74,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
private static final int GRID_CNT = 3;
/** Keys count. */
- private static final int KEYS_CNT = 5000;
+ private static final int KEYS_CNT = 50 * RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
/** Ip finder. */
private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
@@ -261,6 +268,79 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
}
/**
+ * Scan should activate fallback mechanism when new nodes join topology and rebalancing happens in parallel with
+ * scan query.
+ *
+ * @throws Exception In case of error.
+ */
+ public void testScanFallbackOnRebalancingCursor() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1239");
+
+ cacheMode = CacheMode.PARTITIONED;
+ clientMode = false;
+ backups = 1;
+ commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory();
+
+ try {
+ Ignite ignite = startGrids(GRID_CNT);
+
+ fillCache(ignite);
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+
+ IgniteInternalFuture fut1 = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < 5; i++) {
+ startGrid(GRID_CNT + i);
+
+ U.sleep(500);
+ }
+
+ done.set(true);
+
+ return null;
+ }
+ }, 1);
+
+ final AtomicInteger nodeIdx = new AtomicInteger();
+
+ IgniteInternalFuture fut2 = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int nodeId = nodeIdx.getAndIncrement();
+
+ IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
+
+ while (!done.get()) {
+ int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
+
+ try {
+ QueryCursor<Cache.Entry<Integer, Integer>> cur =
+ cache.query(new ScanQuery<Integer, Integer>(part));
+
+ U.debug(log, "Running query [node=" + nodeId + ", part=" + part + ']');
+
+ doTestScanQueryCursor(cur, part);
+ }
+ catch (ClusterGroupEmptyCheckedException e) {
+ log.warning("Invalid partition: " + part, e);
+ }
+ }
+
+ return null;
+ }
+ }, GRID_CNT);
+
+ fut1.get();
+ fut2.get();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
* Scan should try first remote node and fallbacks to second remote node.
*
* @throws Exception If failed.
@@ -391,6 +471,29 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
}
/**
+ * @param cur Query cursor.
+ * @param part Partition number.
+ */
+ protected void doTestScanQueryCursor(
+ QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) throws IgniteCheckedException {
+
+ Map<Integer, Integer> map = entries.get(part);
+
+ assert map != null;
+
+ int cnt = 0;
+
+ for (Cache.Entry<Integer, Integer> e : cur) {
+
+ assertEquals(map.get(e.getKey()), e.getValue());
+
+ cnt++;
+ }
+
+ assertEquals("Invalid number of entries for partition: " + part, map.size(), cnt);
+ }
+
+ /**
* @param cctx Cctx.
*/
private static int anyLocalPartition(GridCacheContext<?, ?> cctx) {