You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/14 11:11:11 UTC
[02/28] ignite git commit: IGNITE-802: reworked
GridCachePartitionedQueueEntryMoveSelfTest.testQueue
IGNITE-802: reworked GridCachePartitionedQueueEntryMoveSelfTest.testQueue
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ec5c795a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ec5c795a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ec5c795a
Branch: refs/heads/ignite-971
Commit: ec5c795aa523cc48c292cfb09e422edcd8a1a42b
Parents: d96e0d2
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Sep 10 12:18:44 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Sep 10 12:18:44 2015 +0300
----------------------------------------------------------------------
...dCachePartitionedQueueEntryMoveSelfTest.java | 191 +++++++------------
1 file changed, 66 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5c795a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
index 4d92b88..1d225a6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
@@ -30,18 +29,15 @@ import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
import org.apache.ignite.internal.processors.cache.datastructures.IgniteCollectionAbstractTest;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -52,11 +48,6 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
* Cache queue test with changing topology.
*/
public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollectionAbstractTest {
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-802");
- }
-
/** Queue capacity. */
private static final int QUEUE_CAP = 5;
@@ -66,9 +57,6 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
/** Backups count. */
private static final int BACKUP_CNT = 1;
- /** Node ID to set manually on node startup. */
- private UUID nodeId;
-
/** {@inheritDoc} */
@Override protected int gridCount() {
return GRID_CNT;
@@ -98,116 +86,93 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
return colCfg;
}
- /** {@inheritDoc} */
- @SuppressWarnings("deprecation")
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- if (nodeId != null) {
- cfg.setNodeId(nodeId);
-
- nodeId = null;
- }
-
- return cfg;
- }
-
/**
* @throws Exception If failed.
*/
public void testQueue() throws Exception {
- try {
- startGrids(GRID_CNT);
-
- final String queueName = "queue-name-" + UUID.randomUUID();
+ final String queueName = "queue-test-name";
- System.out.println(U.filler(20, '\n'));
+ System.out.println(U.filler(20, '\n'));
- final CountDownLatch latch1 = new CountDownLatch(1);
- //final CountDownLatch latch2 = new CountDownLatch(1);
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
- IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() {
- Ignite ignite = grid(0);
+ IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws IgniteInterruptedCheckedException {
+ Ignite ignite = grid(0);
- IgniteQueue<Integer> queue = ignite.queue(queueName,
- QUEUE_CAP,
- config(true));
+ IgniteQueue<Integer> queue = ignite.queue(queueName, QUEUE_CAP, config(true));
- for (int i = 0; i < QUEUE_CAP * 2; i++) {
- if (i == QUEUE_CAP) {
- latch1.countDown();
+ for (int i = 0; i < QUEUE_CAP * 2; i++) {
+ if (i == QUEUE_CAP) {
+ latch1.countDown();
- //U.await(latch2);
- }
-
- try {
- info(">>> Putting value: " + i);
+ U.await(latch2);
+ }
- queue.put(i);
+ try {
+ info(">>> Putting value: " + i);
- info(">>> Value is in queue: " + i);
- }
- catch (Error | RuntimeException e) {
- error("Failed to put value: " + i, e);
+ queue.put(i);
- throw e;
- }
+ info(">>> Value is in queue: " + i);
}
+ catch (Error | RuntimeException e) {
+ error("Failed to put value: " + i, e);
- return null;
+ throw e;
+ }
}
- });
- latch1.await();
+ return null;
+ }
+ });
- startAdditionalNodes(BACKUP_CNT + 2, queueName);
+ latch1.await();
- System.out.println(U.filler(20, '\n'));
+ startAdditionalNodes(BACKUP_CNT + 2, queueName);
- //latch2.countDown();
+ System.out.println(U.filler(20, '\n'));
- IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws IgniteCheckedException {
- Ignite ignite = grid(GRID_CNT);
+ latch2.countDown();
- IgniteQueue<Integer> queue = ignite.queue(queueName, Integer.MAX_VALUE, config(true));
+ IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws IgniteCheckedException {
+ Ignite ignite = grid(GRID_CNT);
- int cnt = 0;
+ IgniteQueue<Integer> queue = ignite.queue(queueName, QUEUE_CAP, config(true));
- do {
- try {
- Integer i = queue.poll();
+ int cnt = 0;
- if (i != null) {
- info(">>> Polled value: " + cnt);
+ do {
+ try {
+ Integer i = queue.poll();
- cnt++;
- }
- else {
- info(">>> Waiting for value...");
+ if (i != null) {
+ info(">>> Polled value: " + cnt);
- U.sleep(2000);
- }
+ cnt++;
}
- catch (Error | RuntimeException e) {
- error("Failed to poll value.", e);
+ else {
+ info(">>> Waiting for value...");
- throw e;
+ U.sleep(2000);
}
}
- while (cnt < QUEUE_CAP * 2);
+ catch (Error | RuntimeException e) {
+ error("Failed to poll value.", e);
- return null;
+ throw e;
+ }
}
- });
+ while (cnt < QUEUE_CAP * 2);
- fut1.get();
- fut2.get();
- }
- finally {
- stopAllGrids();
- }
+ return null;
+ }
+ });
+
+ fut1.get();
+ fut2.get();
}
/**
@@ -218,51 +183,27 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
* @throws Exception If failed.
*/
private void startAdditionalNodes(int cnt, String queueName) throws Exception {
- AffinityFunction aff = jcache(0).getConfiguration(CacheConfiguration.class).getAffinity();
- AffinityKeyMapper mapper = jcache(0).getConfiguration(CacheConfiguration.class).getAffinityMapper();
-
- assertNotNull(aff);
- assertNotNull(mapper);
-
- int part = aff.partition(mapper.affinityKey(queueName));
+ IgniteQueue queue = ignite(0).queue(queueName, 0, null);
- Collection<ClusterNode> nodes = grid(0).cluster().nodes();
+ CacheConfiguration cCfg = getQueueCache(queue);
- Collection<ClusterNode> aff0 = ignite(0).affinity(null).mapKeyToPrimaryAndBackups(queueName);
- Collection<ClusterNode> aff1 = nodes(aff, part, nodes);
+ Collection<ClusterNode> aff1 = ignite(0).affinity(cCfg.getName()).mapKeyToPrimaryAndBackups(queueName);
- assertEquals(new ArrayList<>(aff0), new ArrayList<>(aff1));
+ for (int i = 0, id = GRID_CNT; i < cnt; i++) {
+ startGrid(id++);
- Collection<ClusterNode> aff2;
- Collection<ClusterNode> tmpNodes;
+ awaitPartitionMapExchange();
- int retries = 10000;
+ Collection<ClusterNode> aff2 = ignite(0).affinity(cCfg.getName()).mapKeyToPrimaryAndBackups(queueName);
- do {
- tmpNodes = new ArrayList<>(cnt);
+ if (!aff1.iterator().next().equals(aff2.iterator().next())) {
+ info("Moved queue to new primary node [oldAff=" + aff1 + ", newAff=" + aff2 + ']');
- for (int i = 0; i < cnt; i++)
- tmpNodes.add(new GridTestNode(UUID.randomUUID()));
-
- aff2 = nodes(aff, part, F.concat(true, tmpNodes, nodes));
-
- if (retries-- < 0)
- throw new IgniteCheckedException("Failed to find node IDs to change current affinity mapping.");
+ return;
+ }
}
- while (F.containsAny(aff1, aff2));
-
- int i = GRID_CNT;
-
- // Start several additional grids.
- for (UUID id : F.nodeIds(tmpNodes)) {
- nodeId = id;
-
- startGrid(i++);
- }
-
- aff2 = ignite(0).affinity(null).mapKeyToPrimaryAndBackups(queueName);
- assertFalse("Unexpected affinity [aff1=" + aff1 + ", aff2=" + aff2 + ']', F.containsAny(aff1, aff2));
+ throw new IgniteCheckedException("Unable to move the queue to a new primary node");
}
/**