You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/10 14:04:43 UTC

[08/17] ignite git commit: IGNITE-802: reworked GridCachePartitionedQueueEntryMoveSelfTest.testQueue

IGNITE-802: reworked GridCachePartitionedQueueEntryMoveSelfTest.testQueue


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ec5c795a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ec5c795a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ec5c795a

Branch: refs/heads/ignite-1282
Commit: ec5c795aa523cc48c292cfb09e422edcd8a1a42b
Parents: d96e0d2
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Sep 10 12:18:44 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Sep 10 12:18:44 2015 +0300

----------------------------------------------------------------------
 ...dCachePartitionedQueueEntryMoveSelfTest.java | 191 +++++++------------
 1 file changed, 66 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5c795a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
index 4d92b88..1d225a6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.Ignite;
@@ -30,18 +29,15 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.CollectionConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteCollectionAbstractTest;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -52,11 +48,6 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
  * Cache queue test with changing topology.
  */
 public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollectionAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-802");
-    }
-
     /** Queue capacity. */
     private static final int QUEUE_CAP = 5;
 
@@ -66,9 +57,6 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
     /** Backups count. */
     private static final int BACKUP_CNT = 1;
 
-    /** Node ID to set manually on node startup. */
-    private UUID nodeId;
-
     /** {@inheritDoc} */
     @Override protected int gridCount() {
         return GRID_CNT;
@@ -98,116 +86,93 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
         return colCfg;
     }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings("deprecation")
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        if (nodeId != null) {
-            cfg.setNodeId(nodeId);
-
-            nodeId = null;
-        }
-
-        return cfg;
-    }
-
     /**
      * @throws Exception If failed.
      */
     public void testQueue() throws Exception {
-        try {
-            startGrids(GRID_CNT);
-
-            final String queueName = "queue-name-" + UUID.randomUUID();
+        final String queueName = "queue-test-name";
 
-            System.out.println(U.filler(20, '\n'));
+        System.out.println(U.filler(20, '\n'));
 
-            final CountDownLatch latch1 = new CountDownLatch(1);
-            //final CountDownLatch latch2 = new CountDownLatch(1);
+        final CountDownLatch latch1 = new CountDownLatch(1);
+        final CountDownLatch latch2 = new CountDownLatch(1);
 
-            IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
-                @Override public Void call() {
-                    Ignite ignite = grid(0);
+        IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws IgniteInterruptedCheckedException {
+                Ignite ignite = grid(0);
 
-                    IgniteQueue<Integer> queue = ignite.queue(queueName,
-                        QUEUE_CAP,
-                        config(true));
+                IgniteQueue<Integer> queue = ignite.queue(queueName, QUEUE_CAP, config(true));
 
-                    for (int i = 0; i < QUEUE_CAP * 2; i++) {
-                        if (i == QUEUE_CAP) {
-                            latch1.countDown();
+                for (int i = 0; i < QUEUE_CAP * 2; i++) {
+                    if (i == QUEUE_CAP) {
+                        latch1.countDown();
 
-                            //U.await(latch2);
-                        }
-
-                        try {
-                            info(">>> Putting value: " + i);
+                        U.await(latch2);
+                    }
 
-                            queue.put(i);
+                    try {
+                        info(">>> Putting value: " + i);
 
-                            info(">>> Value is in queue: " + i);
-                        }
-                        catch (Error | RuntimeException e) {
-                            error("Failed to put value: " + i, e);
+                        queue.put(i);
 
-                            throw e;
-                        }
+                        info(">>> Value is in queue: " + i);
                     }
+                    catch (Error | RuntimeException e) {
+                        error("Failed to put value: " + i, e);
 
-                    return null;
+                        throw e;
+                    }
                 }
-            });
 
-            latch1.await();
+                return null;
+            }
+        });
 
-            startAdditionalNodes(BACKUP_CNT + 2, queueName);
+        latch1.await();
 
-            System.out.println(U.filler(20, '\n'));
+        startAdditionalNodes(BACKUP_CNT + 2, queueName);
 
-            //latch2.countDown();
+        System.out.println(U.filler(20, '\n'));
 
-            IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
-                @Override public Void call() throws IgniteCheckedException {
-                    Ignite ignite = grid(GRID_CNT);
+        latch2.countDown();
 
-                    IgniteQueue<Integer> queue = ignite.queue(queueName, Integer.MAX_VALUE, config(true));
+        IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws IgniteCheckedException {
+                Ignite ignite = grid(GRID_CNT);
 
-                    int cnt = 0;
+                IgniteQueue<Integer> queue = ignite.queue(queueName, QUEUE_CAP, config(true));
 
-                    do {
-                        try {
-                            Integer i = queue.poll();
+                int cnt = 0;
 
-                            if (i != null) {
-                                info(">>> Polled value: " + cnt);
+                do {
+                    try {
+                        Integer i = queue.poll();
 
-                                cnt++;
-                            }
-                            else {
-                                info(">>> Waiting for value...");
+                        if (i != null) {
+                            info(">>> Polled value: " + cnt);
 
-                                U.sleep(2000);
-                            }
+                            cnt++;
                         }
-                        catch (Error | RuntimeException e) {
-                            error("Failed to poll value.", e);
+                        else {
+                            info(">>> Waiting for value...");
 
-                            throw e;
+                            U.sleep(2000);
                         }
                     }
-                    while (cnt < QUEUE_CAP * 2);
+                    catch (Error | RuntimeException e) {
+                        error("Failed to poll value.", e);
 
-                    return null;
+                        throw e;
+                    }
                 }
-            });
+                while (cnt < QUEUE_CAP * 2);
 
-            fut1.get();
-            fut2.get();
-        }
-        finally {
-            stopAllGrids();
-        }
+                return null;
+            }
+        });
+
+        fut1.get();
+        fut2.get();
     }
 
     /**
@@ -218,51 +183,27 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
      * @throws Exception If failed.
      */
     private void startAdditionalNodes(int cnt, String queueName) throws Exception {
-        AffinityFunction aff = jcache(0).getConfiguration(CacheConfiguration.class).getAffinity();
-        AffinityKeyMapper mapper = jcache(0).getConfiguration(CacheConfiguration.class).getAffinityMapper();
-
-        assertNotNull(aff);
-        assertNotNull(mapper);
-
-        int part = aff.partition(mapper.affinityKey(queueName));
+        IgniteQueue queue = ignite(0).queue(queueName, 0, null);
 
-        Collection<ClusterNode> nodes = grid(0).cluster().nodes();
+        CacheConfiguration cCfg = getQueueCache(queue);
 
-        Collection<ClusterNode> aff0 = ignite(0).affinity(null).mapKeyToPrimaryAndBackups(queueName);
-        Collection<ClusterNode> aff1 = nodes(aff, part, nodes);
+        Collection<ClusterNode> aff1 = ignite(0).affinity(cCfg.getName()).mapKeyToPrimaryAndBackups(queueName);
 
-        assertEquals(new ArrayList<>(aff0), new ArrayList<>(aff1));
+        for (int i = 0, id = GRID_CNT; i < cnt; i++) {
+            startGrid(id++);
 
-        Collection<ClusterNode> aff2;
-        Collection<ClusterNode> tmpNodes;
+            awaitPartitionMapExchange();
 
-        int retries = 10000;
+            Collection<ClusterNode> aff2 = ignite(0).affinity(cCfg.getName()).mapKeyToPrimaryAndBackups(queueName);
 
-        do {
-            tmpNodes = new ArrayList<>(cnt);
+            if (!aff1.iterator().next().equals(aff2.iterator().next())) {
+                info("Moved queue to new primary node [oldAff=" + aff1 + ", newAff=" + aff2 + ']');
 
-            for (int i = 0; i < cnt; i++)
-                tmpNodes.add(new GridTestNode(UUID.randomUUID()));
-
-            aff2 = nodes(aff, part, F.concat(true, tmpNodes, nodes));
-
-            if (retries-- < 0)
-                throw new IgniteCheckedException("Failed to find node IDs to change current affinity mapping.");
+                return;
+            }
         }
-        while (F.containsAny(aff1, aff2));
-
-        int i = GRID_CNT;
-
-        // Start several additional grids.
-        for (UUID id : F.nodeIds(tmpNodes)) {
-            nodeId = id;
-
-            startGrid(i++);
-        }
-
-        aff2 = ignite(0).affinity(null).mapKeyToPrimaryAndBackups(queueName);
 
-        assertFalse("Unexpected affinity [aff1=" + aff1 + ", aff2=" + aff2 + ']', F.containsAny(aff1, aff2));
+        throw new IgniteCheckedException("Unable to move the queue to a new primary node");
     }
 
     /**