You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/11 09:18:23 UTC

[08/50] incubator-ignite git commit: ignite-389 Partition scan query fallback test

ignite-389 Partition scan query fallback test


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29dc7221
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29dc7221
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29dc7221

Branch: refs/heads/ignite-484-1
Commit: 29dc7221c12db1e39a17de4471a8c5ebed4b8709
Parents: 5d6bb53
Author: agura <ag...@gridgain.com>
Authored: Fri May 29 16:28:34 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 29 16:28:34 2015 +0300

----------------------------------------------------------------------
 ...CacheScanPartitionQueryFallbackSelfTest.java | 335 ++++++++++++++-----
 1 file changed, 259 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29dc7221/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index 31336e6..dfa7296 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -21,8 +21,10 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
@@ -32,15 +34,17 @@ import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.testframework.junits.common.*;
 
 import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 
 /**
  * Tests partition scan query fallback.
  */
 public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractTest {
     /** Grid count. */
-    private static final int GRID_CNT = 5;
+    private static final int GRID_CNT = 3;
 
-    /** Kys count. */
+    /** Keys count. */
     private static final int KEYS_CNT = 5000;
 
     /** Backups. */
@@ -49,20 +53,29 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     /** Cache mode. */
     private CacheMode cacheMode;
 
-    /** Fallback. */
-    private boolean fallback;
+    /** Client mode. */
+    private volatile boolean clientMode;
 
-    /** Primary node id. */
-    private static volatile UUID expNodeId;
+    /** Expected first node ID. */
+    private static UUID expNodeId;
 
-    /** Fail node id. */
-    private static volatile UUID failNodeId;
+    /** Expected fallback node ID. */
+    private static UUID expFallbackNodeId;
+
+    /** Communication SPI factory. */
+    private CommunicationSpiFactory commSpiFactory;
+
+    /** Latch. */
+    private static CountDownLatch latch;
+
+    /** Test entries. */
+    private Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        cfg.setCommunicationSpi(new TestCommunicationSpi());
+        cfg.setCommunicationSpi(commSpiFactory.create());
 
         CacheConfiguration ccfg = defaultCacheConfiguration();
         ccfg.setCacheMode(cacheMode);
@@ -72,142 +85,312 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
 
         cfg.setCacheConfiguration(ccfg);
 
+        cfg.setClientMode(clientMode);
+
         return cfg;
     }
 
     /**
+     * Scan should perform on the local node.
+     *
      * @throws Exception If failed.
      */
-    public void testPrimary() throws Exception {
+    public void testScanLocal() throws Exception {
         cacheMode = CacheMode.PARTITIONED;
         backups = 0;
-        failNodeId = null;
-        fallback = false;
+        commSpiFactory = new TestLocalCommunicationSpiFactory();
 
-        doTestScanPartition();
+        try {
+            Ignite ignite = startGrids(GRID_CNT);
+
+            IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite);
+
+            int part = anyLocalPartition(cache.context());
+
+            CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+
+            doTestScanQuery(qry);
+        }
+        finally {
+            stopAllGrids();
+        }
     }
 
     /**
+     * Scan should perform on the remote node.
+     *
      * @throws Exception If failed.
      */
-    public void testFallbackToBackup() throws Exception {
+    public void testScanRemote() throws Exception {
         cacheMode = CacheMode.PARTITIONED;
-        backups = 1;
-        failNodeId = null;
-        fallback = true;
+        backups = 0;
+        commSpiFactory = new TestRemoteCommunicationSpiFactory();
 
-        doTestScanPartition();
+        try {
+            Ignite ignite = startGrids(GRID_CNT);
+
+            IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite);
+
+            IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context());
+
+            int part = tup.get1();
+
+            expNodeId = tup.get2();
+
+            CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+
+            doTestScanQuery(qry);
+        }
+        finally {
+            stopAllGrids();
+        }
     }
 
     /**
+     * Scan should try first remote node and fallbacks to second remote node.
+     *
      * @throws Exception If failed.
      */
-    protected void doTestScanPartition() throws Exception {
-        try {
-            Ignite ignite = startGrids(GRID_CNT);
+    public void testScanFallback() throws Exception {
+        cacheMode = CacheMode.PARTITIONED;
+        backups = 1;
+        commSpiFactory = new TestFallbackCommunicationSpiFactory();
 
-            IgniteCacheProxy<Integer, Integer> cache =
-                (IgniteCacheProxy<Integer, Integer>)ignite.<Integer, Integer>cache(null);
+        final Set<Integer> candidates = new TreeSet<>();
 
-            Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
+        final AtomicBoolean test = new AtomicBoolean(false);
 
-            for (int i = 0; i < KEYS_CNT; i++) {
-                cache.put(i, i);
+        for(int j = 0; j < 2; j++) {
+            clientMode = true;
 
-                int part = cache.context().affinity().partition(i);
+            latch = new CountDownLatch(1);
 
-                Map<Integer, Integer> partEntries = entries.get(part);
+            try {
+                final Ignite ignite0 = startGrid(0);
 
-                if (partEntries == null)
-                    entries.put(part, partEntries = new HashMap<>());
+                clientMode = false;
 
-                partEntries.put(i, i);
-            }
+                final IgniteEx ignite1 = startGrid(1);
+                final IgniteEx ignite2 = startGrid(2);
+                startGrid(3);
 
-            IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context(), true);
+                if (test.get()) {
+                    expNodeId = ignite1.localNode().id();
+                    expFallbackNodeId = ignite2.localNode().id();
+                }
 
-            int part = tup.get1();
+                final IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite0);
 
-            if (fallback)
-                failNodeId = tup.get2();
-            else
-                expNodeId = tup.get2();
+                if (!test.get()) {
+                    candidates.addAll(localPartitions(ignite1));
+                    candidates.retainAll(localPartitions(ignite2));
+                }
 
-            if (fallback)
-                expNodeId = remoteBackup(part, cache.context());
+                Runnable run = new Runnable() {
+                    @Override public void run() {
+                        try {
+                            startGrid(4);
+                            startGrid(5);
 
-            CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+                            awaitPartitionMapExchange();
+
+                            if (!test.get()) {
+                                Set<Integer> parts = localPartitions(ignite1);
+                                candidates.removeAll(parts);
+                            }
 
-            CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+                            latch.countDown();
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                        }
 
-            Collection<Map.Entry<Integer, Integer>> expEntries = fut.get();
+                    }
+                };
 
-            for (Map.Entry<Integer, Integer> e : expEntries) {
-                Map<Integer, Integer> map = entries.get(part);
+                int part;
+                CacheQuery<Map.Entry<Integer, Integer>> qry = null;
 
-                if(map == null)
-                    assertTrue(expEntries.isEmpty());
+                if (test.get()) {
+                    part = F.first(candidates);
+
+                    qry = cache.context().queries().createScanQuery(null, part, false);
+                }
+
+                new Thread(run).start();
+
+                if (test.get())
+                    doTestScanQuery(qry);
                 else
-                    assertEquals(map.get(e.getKey()), e.getValue());
+                    latch.await();
+            }
+            finally {
+                test.set(true);
+
+                stopAllGrids();
             }
         }
-        finally {
-            stopAllGrids();
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    protected IgniteCacheProxy<Integer, Integer> fillCache(Ignite ignite) {
+        IgniteCacheProxy<Integer, Integer> cache =
+            (IgniteCacheProxy<Integer, Integer>)ignite.<Integer, Integer>cache(null);
+
+        for (int i = 0; i < KEYS_CNT; i++) {
+            cache.put(i, i);
+
+            int part = cache.context().affinity().partition(i);
+
+            Map<Integer, Integer> partEntries = entries.get(part);
+
+            if (partEntries == null)
+                entries.put(part, partEntries = new HashMap<>());
+
+            partEntries.put(i, i);
         }
+
+        return cache;
+    }
+
+    /**
+     * @param qry Query.
+     */
+    protected void doTestScanQuery(
+        CacheQuery<Map.Entry<Integer, Integer>> qry) throws IgniteCheckedException {
+        CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
+
+        Collection<Map.Entry<Integer, Integer>> expEntries = fut.get();
+
+        for (Map.Entry<Integer, Integer> e : expEntries) {
+            Map<Integer, Integer> map = entries.get(((GridCacheQueryAdapter)qry).partition());
+
+            if (map == null)
+                assertTrue(expEntries.isEmpty());
+            else
+                assertEquals(map.get(e.getKey()), e.getValue());
+        }
+    }
+
+    /**
+     * @param cctx Cctx.
+     */
+    private static int anyLocalPartition(GridCacheContext<?, ?> cctx) {
+        return F.first(cctx.topology().localPartitions()).id();
     }
 
     /**
      * @param cctx Cctx.
-     * @param primary Primary.
      */
-    private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx, boolean primary) {
+    private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx) {
         ClusterNode node = F.first(cctx.kernalContext().grid().cluster().forRemotes().nodes());
 
         GridCacheAffinityManager affMgr = cctx.affinity();
 
         AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion();
 
-        Set<Integer> parts = primary ?
-            affMgr.primaryPartitions(node.id(), topVer) : affMgr.backupPartitions(node.id(), topVer);
+        Set<Integer> parts = affMgr.primaryPartitions(node.id(), topVer);
 
         return new IgniteBiTuple<>(F.first(parts), node.id());
     }
 
     /**
-     * @param part Partition.
-     * @param cctx Cctx.
+     * @param ignite Ignite.
      */
-    private UUID remoteBackup(int part, final GridCacheContext cctx) {
-        final UUID locUuid = cctx.localNodeId();
+    private Set<Integer> localPartitions(Ignite ignite) {
+        GridCacheContext cctx = ((IgniteCacheProxy)ignite.cache(null)).context();
+
+        Collection<GridDhtLocalPartition> owningParts = F.view(cctx.topology().localPartitions(),
+            new IgnitePredicate<GridDhtLocalPartition>() {
+                @Override public boolean apply(GridDhtLocalPartition part) {
+                    return part.state() == GridDhtPartitionState.OWNING;
+                }
+            });
+
+        return new HashSet<>(F.transform(owningParts, new IgniteClosure<GridDhtLocalPartition, Integer>() {
+            @Override public Integer apply(GridDhtLocalPartition part) {
+                return part.id();
+            }
+        }));
+    }
 
-        GridCacheAffinityManager affMgr = cctx.affinity();
+    /**
+     * Factory for tests specific communication SPI.
+     */
+    private interface CommunicationSpiFactory {
+        /**
+         * Creates communication SPI instance.
+         */
+        TcpCommunicationSpi create();
+    }
 
-        AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion();
+    /**
+     *
+     */
+    private static class TestLocalCommunicationSpiFactory implements CommunicationSpiFactory {
+        /** {@inheritDoc} */
+        @Override public TcpCommunicationSpi create() {
+            return new TcpCommunicationSpi() {
+                @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+                    Object origMsg = ((GridIoMessage)msg).message();
 
-        return F.first(F.view(affMgr.backups(part, topVer), new IgnitePredicate<ClusterNode>() {
-            @Override public boolean apply(ClusterNode node) {
-                return !node.id().equals(locUuid);
-            }
-        })).id();
+                    if (origMsg instanceof GridCacheQueryRequest)
+                        fail(); //should use local node
+
+                    super.sendMessage(node, msg);
+                }
+            };
+        }
     }
 
     /**
      *
      */
-    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+    private static class TestRemoteCommunicationSpiFactory implements CommunicationSpiFactory {
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg)
-            throws IgniteSpiException {
-            Object origMsg = ((GridIoMessage)msg).message();
+        @Override public TcpCommunicationSpi create() {
+            return new TcpCommunicationSpi() {
+                @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+                    Object origMsg = ((GridIoMessage)msg).message();
 
-            if (origMsg instanceof GridCacheQueryRequest) {
-                if (node.id().equals(failNodeId))
-                    throw new IgniteSpiException("");
-                else
-                    assertEquals(expNodeId, node.id());
-            }
+                    if (origMsg instanceof GridCacheQueryRequest)
+                        assertEquals(expNodeId, node.id());
+
+                    super.sendMessage(node, msg);
+                }
+            };
+        }
+    }
 
-            super.sendMessage(node, msg);
+    /**
+     *
+     */
+    private static class TestFallbackCommunicationSpiFactory implements CommunicationSpiFactory {
+        /** {@inheritDoc} */
+        @Override public TcpCommunicationSpi create() {
+            return new TcpCommunicationSpi() {
+                @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+                    Object origMsg = ((GridIoMessage)msg).message();
+
+                    if (origMsg instanceof GridCacheQueryRequest) {
+                        if (latch.getCount() > 0)
+                            assertEquals(expNodeId, node.id());
+                        else
+                            assertEquals(expFallbackNodeId, node.id());
+
+                        try {
+                            latch.await();
+                        }
+                        catch (InterruptedException e) {
+                            throw new IgniteSpiException(e);
+                        }
+                    }
+
+                    super.sendMessage(node, msg);
+                }
+            };
         }
     }
 }