You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 15:12:28 UTC
[02/50] incubator-ignite git commit: ignite-389 Partition scan query
fallback test
ignite-389 Partition scan query fallback test
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29dc7221
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29dc7221
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29dc7221
Branch: refs/heads/ignite-745
Commit: 29dc7221c12db1e39a17de4471a8c5ebed4b8709
Parents: 5d6bb53
Author: agura <ag...@gridgain.com>
Authored: Fri May 29 16:28:34 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 29 16:28:34 2015 +0300
----------------------------------------------------------------------
...CacheScanPartitionQueryFallbackSelfTest.java | 335 ++++++++++++++-----
1 file changed, 259 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29dc7221/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index 31336e6..dfa7296 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -21,8 +21,10 @@ import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.lang.*;
@@ -32,15 +34,17 @@ import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.testframework.junits.common.*;
import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
/**
* Tests partition scan query fallback.
*/
public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractTest {
/** Grid count. */
- private static final int GRID_CNT = 5;
+ private static final int GRID_CNT = 3;
- /** Kys count. */
+ /** Keys count. */
private static final int KEYS_CNT = 5000;
/** Backups. */
@@ -49,20 +53,29 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
/** Cache mode. */
private CacheMode cacheMode;
- /** Fallback. */
- private boolean fallback;
+ /** Client mode. */
+ private volatile boolean clientMode;
- /** Primary node id. */
- private static volatile UUID expNodeId;
+ /** Expected first node ID. */
+ private static UUID expNodeId;
- /** Fail node id. */
- private static volatile UUID failNodeId;
+ /** Expected fallback node ID. */
+ private static UUID expFallbackNodeId;
+
+ /** Communication SPI factory. */
+ private CommunicationSpiFactory commSpiFactory;
+
+ /** Latch. */
+ private static CountDownLatch latch;
+
+ /** Test entries. */
+ private Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- cfg.setCommunicationSpi(new TestCommunicationSpi());
+ cfg.setCommunicationSpi(commSpiFactory.create());
CacheConfiguration ccfg = defaultCacheConfiguration();
ccfg.setCacheMode(cacheMode);
@@ -72,142 +85,312 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
cfg.setCacheConfiguration(ccfg);
+ cfg.setClientMode(clientMode);
+
return cfg;
}
/**
+ * Scan should perform on the local node.
+ *
* @throws Exception If failed.
*/
- public void testPrimary() throws Exception {
+ public void testScanLocal() throws Exception {
cacheMode = CacheMode.PARTITIONED;
backups = 0;
- failNodeId = null;
- fallback = false;
+ commSpiFactory = new TestLocalCommunicationSpiFactory();
- doTestScanPartition();
+ try {
+ Ignite ignite = startGrids(GRID_CNT);
+
+ IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite);
+
+ int part = anyLocalPartition(cache.context());
+
+ CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+
+ doTestScanQuery(qry);
+ }
+ finally {
+ stopAllGrids();
+ }
}
/**
+ * Scan should perform on the remote node.
+ *
* @throws Exception If failed.
*/
- public void testFallbackToBackup() throws Exception {
+ public void testScanRemote() throws Exception {
cacheMode = CacheMode.PARTITIONED;
- backups = 1;
- failNodeId = null;
- fallback = true;
+ backups = 0;
+ commSpiFactory = new TestRemoteCommunicationSpiFactory();
- doTestScanPartition();
+ try {
+ Ignite ignite = startGrids(GRID_CNT);
+
+ IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite);
+
+ IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context());
+
+ int part = tup.get1();
+
+ expNodeId = tup.get2();
+
+ CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+
+ doTestScanQuery(qry);
+ }
+ finally {
+ stopAllGrids();
+ }
}
/**
+ * Scan should try first remote node and fallbacks to second remote node.
+ *
* @throws Exception If failed.
*/
- protected void doTestScanPartition() throws Exception {
- try {
- Ignite ignite = startGrids(GRID_CNT);
+ public void testScanFallback() throws Exception {
+ cacheMode = CacheMode.PARTITIONED;
+ backups = 1;
+ commSpiFactory = new TestFallbackCommunicationSpiFactory();
- IgniteCacheProxy<Integer, Integer> cache =
- (IgniteCacheProxy<Integer, Integer>)ignite.<Integer, Integer>cache(null);
+ final Set<Integer> candidates = new TreeSet<>();
- Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
+ final AtomicBoolean test = new AtomicBoolean(false);
- for (int i = 0; i < KEYS_CNT; i++) {
- cache.put(i, i);
+ for(int j = 0; j < 2; j++) {
+ clientMode = true;
- int part = cache.context().affinity().partition(i);
+ latch = new CountDownLatch(1);
- Map<Integer, Integer> partEntries = entries.get(part);
+ try {
+ final Ignite ignite0 = startGrid(0);
- if (partEntries == null)
- entries.put(part, partEntries = new HashMap<>());
+ clientMode = false;
- partEntries.put(i, i);
- }
+ final IgniteEx ignite1 = startGrid(1);
+ final IgniteEx ignite2 = startGrid(2);
+ startGrid(3);
- IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context(), true);
+ if (test.get()) {
+ expNodeId = ignite1.localNode().id();
+ expFallbackNodeId = ignite2.localNode().id();
+ }
- int part = tup.get1();
+ final IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite0);
- if (fallback)
- failNodeId = tup.get2();
- else
- expNodeId = tup.get2();
+ if (!test.get()) {
+ candidates.addAll(localPartitions(ignite1));
+ candidates.retainAll(localPartitions(ignite2));
+ }
- if (fallback)
- expNodeId = remoteBackup(part, cache.context());
+ Runnable run = new Runnable() {
+ @Override public void run() {
+ try {
+ startGrid(4);
+ startGrid(5);
- CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+ awaitPartitionMapExchange();
+
+ if (!test.get()) {
+ Set<Integer> parts = localPartitions(ignite1);
+ candidates.removeAll(parts);
+ }
- CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+ latch.countDown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
- Collection<Map.Entry<Integer, Integer>> expEntries = fut.get();
+ }
+ };
- for (Map.Entry<Integer, Integer> e : expEntries) {
- Map<Integer, Integer> map = entries.get(part);
+ int part;
+ CacheQuery<Map.Entry<Integer, Integer>> qry = null;
- if(map == null)
- assertTrue(expEntries.isEmpty());
+ if (test.get()) {
+ part = F.first(candidates);
+
+ qry = cache.context().queries().createScanQuery(null, part, false);
+ }
+
+ new Thread(run).start();
+
+ if (test.get())
+ doTestScanQuery(qry);
else
- assertEquals(map.get(e.getKey()), e.getValue());
+ latch.await();
+ }
+ finally {
+ test.set(true);
+
+ stopAllGrids();
}
}
- finally {
- stopAllGrids();
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ protected IgniteCacheProxy<Integer, Integer> fillCache(Ignite ignite) {
+ IgniteCacheProxy<Integer, Integer> cache =
+ (IgniteCacheProxy<Integer, Integer>)ignite.<Integer, Integer>cache(null);
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ cache.put(i, i);
+
+ int part = cache.context().affinity().partition(i);
+
+ Map<Integer, Integer> partEntries = entries.get(part);
+
+ if (partEntries == null)
+ entries.put(part, partEntries = new HashMap<>());
+
+ partEntries.put(i, i);
}
+
+ return cache;
+ }
+
+ /**
+ * @param qry Query.
+ */
+ protected void doTestScanQuery(
+ CacheQuery<Map.Entry<Integer, Integer>> qry) throws IgniteCheckedException {
+ CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+ Collection<Map.Entry<Integer, Integer>> expEntries = fut.get();
+
+ for (Map.Entry<Integer, Integer> e : expEntries) {
+ Map<Integer, Integer> map = entries.get(((GridCacheQueryAdapter)qry).partition());
+
+ if (map == null)
+ assertTrue(expEntries.isEmpty());
+ else
+ assertEquals(map.get(e.getKey()), e.getValue());
+ }
+ }
+
+ /**
+ * @param cctx Cctx.
+ */
+ private static int anyLocalPartition(GridCacheContext<?, ?> cctx) {
+ return F.first(cctx.topology().localPartitions()).id();
}
/**
* @param cctx Cctx.
- * @param primary Primary.
*/
- private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx, boolean primary) {
+ private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx) {
ClusterNode node = F.first(cctx.kernalContext().grid().cluster().forRemotes().nodes());
GridCacheAffinityManager affMgr = cctx.affinity();
AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion();
- Set<Integer> parts = primary ?
- affMgr.primaryPartitions(node.id(), topVer) : affMgr.backupPartitions(node.id(), topVer);
+ Set<Integer> parts = affMgr.primaryPartitions(node.id(), topVer);
return new IgniteBiTuple<>(F.first(parts), node.id());
}
/**
- * @param part Partition.
- * @param cctx Cctx.
+ * @param ignite Ignite.
*/
- private UUID remoteBackup(int part, final GridCacheContext cctx) {
- final UUID locUuid = cctx.localNodeId();
+ private Set<Integer> localPartitions(Ignite ignite) {
+ GridCacheContext cctx = ((IgniteCacheProxy)ignite.cache(null)).context();
+
+ Collection<GridDhtLocalPartition> owningParts = F.view(cctx.topology().localPartitions(),
+ new IgnitePredicate<GridDhtLocalPartition>() {
+ @Override public boolean apply(GridDhtLocalPartition part) {
+ return part.state() == GridDhtPartitionState.OWNING;
+ }
+ });
+
+ return new HashSet<>(F.transform(owningParts, new IgniteClosure<GridDhtLocalPartition, Integer>() {
+ @Override public Integer apply(GridDhtLocalPartition part) {
+ return part.id();
+ }
+ }));
+ }
- GridCacheAffinityManager affMgr = cctx.affinity();
+ /**
+ * Factory for tests specific communication SPI.
+ */
+ private interface CommunicationSpiFactory {
+ /**
+ * Creates communication SPI instance.
+ */
+ TcpCommunicationSpi create();
+ }
- AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion();
+ /**
+ *
+ */
+ private static class TestLocalCommunicationSpiFactory implements CommunicationSpiFactory {
+ /** {@inheritDoc} */
+ @Override public TcpCommunicationSpi create() {
+ return new TcpCommunicationSpi() {
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ Object origMsg = ((GridIoMessage)msg).message();
- return F.first(F.view(affMgr.backups(part, topVer), new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode node) {
- return !node.id().equals(locUuid);
- }
- })).id();
+ if (origMsg instanceof GridCacheQueryRequest)
+ fail(); //should use local node
+
+ super.sendMessage(node, msg);
+ }
+ };
+ }
}
/**
*
*/
- private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ private static class TestRemoteCommunicationSpiFactory implements CommunicationSpiFactory {
/** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg)
- throws IgniteSpiException {
- Object origMsg = ((GridIoMessage)msg).message();
+ @Override public TcpCommunicationSpi create() {
+ return new TcpCommunicationSpi() {
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ Object origMsg = ((GridIoMessage)msg).message();
- if (origMsg instanceof GridCacheQueryRequest) {
- if (node.id().equals(failNodeId))
- throw new IgniteSpiException("");
- else
- assertEquals(expNodeId, node.id());
- }
+ if (origMsg instanceof GridCacheQueryRequest)
+ assertEquals(expNodeId, node.id());
+
+ super.sendMessage(node, msg);
+ }
+ };
+ }
+ }
- super.sendMessage(node, msg);
+ /**
+ *
+ */
+ private static class TestFallbackCommunicationSpiFactory implements CommunicationSpiFactory {
+ /** {@inheritDoc} */
+ @Override public TcpCommunicationSpi create() {
+ return new TcpCommunicationSpi() {
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ Object origMsg = ((GridIoMessage)msg).message();
+
+ if (origMsg instanceof GridCacheQueryRequest) {
+ if (latch.getCount() > 0)
+ assertEquals(expNodeId, node.id());
+ else
+ assertEquals(expFallbackNodeId, node.id());
+
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteSpiException(e);
+ }
+ }
+
+ super.sendMessage(node, msg);
+ }
+ };
}
}
}