You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/24 09:20:15 UTC
[40/50] [abbrv] ignite git commit: ignite-801 and ignite-1911:
resurrecting data structure and atomics failover tests + stopping the node if
ring message worker fails
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 85a26ad..bc11448 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -18,8 +18,15 @@
package org.apache.ignite.internal.processors.cache.datastructures;
import java.util.Collection;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicLong;
@@ -27,20 +34,27 @@ import org.apache.ignite.IgniteAtomicReference;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.typedef.CA;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -50,7 +64,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
*/
public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends IgniteCollectionAbstractTest {
/** */
- private static final long TEST_TIMEOUT = 2 * 60 * 1000;
+ private static final long TEST_TIMEOUT = 3 * 60 * 1000;
/** */
private static final String NEW_GRID_NAME = "newGrid";
@@ -67,6 +81,9 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/** */
private static final int TOP_CHANGE_THREAD_CNT = 3;
+ /** */
+ private boolean client;
+
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return TEST_TIMEOUT;
@@ -119,121 +136,106 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
cfg.setCacheConfiguration(ccfg);
+ if (client) {
+ cfg.setClientMode(client);
+ ((TcpDiscoverySpi)(cfg.getDiscoverySpi())).setForceServerMode(true);
+ }
+
return cfg;
}
/**
* @throws Exception If failed.
*/
- public void testAtomicLongTopologyChange() throws Exception {
- try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) {
- Ignite g = startGrid(NEW_GRID_NAME);
+ public void testAtomicLongFailsWhenServersLeft() throws Exception {
+ client = true;
+
+ Ignite ignite = startGrid(gridCount());
- assert g.atomicLong(STRUCTURE_NAME, 10, true).get() == 10;
+ new Timer().schedule(new TimerTask() {
+ @Override public void run() {
+ for (int i = 0; i < gridCount(); i++)
+ stopGrid(i);
+ }
+ }, 10_000);
- assert g.atomicLong(STRUCTURE_NAME, 10, true).addAndGet(10) == 20;
+ long stopTime = U.currentTimeMillis() + TEST_TIMEOUT / 2;
- stopGrid(NEW_GRID_NAME);
+ IgniteAtomicLong atomic = ignite.atomicLong(STRUCTURE_NAME, 10, true);
+
+ try {
+ while (U.currentTimeMillis() < stopTime)
+ assertEquals(10, atomic.get());
+ }
+ catch (IgniteException e) {
+ if (X.hasCause(e, ClusterTopologyServerNotFoundException.class))
+ return;
- assert grid(0).atomicLong(STRUCTURE_NAME, 10, true).get() == 20;
+ throw e;
}
+
+ fail();
}
/**
* @throws Exception If failed.
*/
- public void testAtomicLongConstantTopologyChange() throws Exception {
- try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override
- public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
-
- assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- long val = s.get();
-
- while (!fut.isDone()) {
- assert s.get() == val;
+ public void testAtomicLongTopologyChange() throws Exception {
+ try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) {
+ Ignite g = startGrid(NEW_GRID_NAME);
- assert s.incrementAndGet() == val + 1;
+ assertEquals(10, g.atomicLong(STRUCTURE_NAME, 10, false).get());
- val++;
- }
+ assertEquals(20, g.atomicLong(STRUCTURE_NAME, 10, false).addAndGet(10));
- fut.get();
+ stopGrid(NEW_GRID_NAME);
- for (Ignite g : G.allGrids())
- assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get());
+ assertEquals(20, grid(0).atomicLong(STRUCTURE_NAME, 10, true).get());
}
}
/**
* @throws Exception If failed.
*/
- public void testAtomicLongConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
+ public void testAtomicLongConstantTopologyChange() throws Exception {
+ doTestAtomicLong(new ConstantTopologyChangeWorker());
+ }
- names.add(name);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicLongConstantMultipleTopologyChange() throws Exception {
+ doTestAtomicLong(multipleTopologyChangeWorker());
+ }
- Ignite g = startGrid(name);
+ /**
+ * Tests IgniteAtomicLong.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicLong(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assert ignite.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
- assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
long val = s.get();
while (!fut.isDone()) {
- assert s.get() == val;
-
- assert s.incrementAndGet() == val + 1;
+ assertEquals(val, s.get());
- val++;
+ assertEquals(++val, s.incrementAndGet());
}
fut.get();
for (Ignite g : G.allGrids())
- assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get());
+ assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, false).get());
}
}
@@ -244,13 +246,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
try (IgniteAtomicReference atomic = grid(0).atomicReference(STRUCTURE_NAME, 10, true)) {
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.atomicReference(STRUCTURE_NAME, 10, true).get() == 10;
+ assertEquals((Integer)10, g.atomicReference(STRUCTURE_NAME, 10, false).get());
- g.atomicReference(STRUCTURE_NAME, 10, true).set(20);
+ g.atomicReference(STRUCTURE_NAME, 10, false).set(20);
stopGrid(NEW_GRID_NAME);
- assertEquals(20, (int) grid(0).atomicReference(STRUCTURE_NAME, 10, true).get());
+ assertEquals((Integer)20, grid(0).atomicReference(STRUCTURE_NAME, 10, true).get());
}
}
@@ -258,85 +260,36 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicReferenceConstantTopologyChange() throws Exception {
- try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override
- public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
-
- assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- int val = s.get();
-
- while (!fut.isDone()) {
- assert s.get() == val;
-
- s.set(++val);
- }
-
- fut.get();
-
- for (Ignite g : G.allGrids())
- assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get());
- }
+ doTestAtomicReference(new ConstantTopologyChangeWorker());
}
/**
* @throws Exception If failed.
*/
public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
-
- names.add(name);
+ doTestAtomicReference(multipleTopologyChangeWorker());
+ }
- Ignite g = startGrid(name);
+ /**
+ * Tests atomic reference.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicReference(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assert ignite.atomicReference(STRUCTURE_NAME, 1, false).get() > 0;
- assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.get();
while (!fut.isDone()) {
- assert s.get() == val;
+ assertEquals(val, (int)s.get());
s.set(++val);
}
@@ -344,7 +297,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
fut.get();
for (Ignite g : G.allGrids())
- assert g.atomicReference(STRUCTURE_NAME, 1, true).get() == val;
+ assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get());
}
}
@@ -355,19 +308,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
try (IgniteAtomicStamped atomic = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true)) {
Ignite g = startGrid(NEW_GRID_NAME);
- IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, true).get();
+ IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, false).get();
- assert t.get1() == 10;
- assert t.get2() == 10;
+ assertEquals((Integer)10, t.get1());
+ assertEquals((Integer)10, t.get2());
- g.atomicStamped(STRUCTURE_NAME, 10, 10, true).set(20, 20);
+ g.atomicStamped(STRUCTURE_NAME, 10, 10, false).set(20, 20);
stopGrid(NEW_GRID_NAME);
- t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true).get();
+ t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, false).get();
- assert t.get1() == 20;
- assert t.get2() == 20;
+ assertEquals((Integer)20, t.get1());
+ assertEquals((Integer)20, t.get2());
}
}
@@ -375,107 +328,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicStampedConstantTopologyChange() throws Exception {
- try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override
- public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
-
- IgniteBiTuple<Integer, Integer> t =
- g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
-
- assert t.get1() > 0;
- assert t.get2() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- int val = s.value();
-
- while (!fut.isDone()) {
- IgniteBiTuple<Integer, Integer> t = s.get();
-
- assert t.get1() == val;
- assert t.get2() == val;
-
- val++;
-
- s.set(val, val);
- }
-
- fut.get();
-
- for (Ignite g : G.allGrids()) {
- IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
-
- assert t.get1() == val;
- assert t.get2() == val;
- }
- }
+ doTestAtomicStamped(new ConstantTopologyChangeWorker());
}
/**
* @throws Exception If failed.
*/
public void testAtomicStampedConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
-
- names.add(name);
+ doTestAtomicStamped(multipleTopologyChangeWorker());
+ }
- Ignite g = startGrid(name);
+ /**
+ * Tests atomic stamped value.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicStamped(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ IgniteBiTuple<Integer, Integer> t = ignite.atomicStamped(STRUCTURE_NAME, 1, 1, false).get();
- IgniteBiTuple<Integer, Integer> t =
- g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
+ assert t.get1() > 0;
+ assert t.get2() > 0;
- assert t.get1() > 0;
- assert t.get2() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.value();
while (!fut.isDone()) {
IgniteBiTuple<Integer, Integer> t = s.get();
- assert t.get1() == val;
- assert t.get2() == val;
+ assertEquals(val, (int)t.get1());
+ assertEquals(val, (int)t.get2());
- val++;
+ ++val;
s.set(val, val);
}
@@ -483,10 +373,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
fut.get();
for (Ignite g : G.allGrids()) {
- IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
+ IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, false).get();
- assert t.get1() == val;
- assert t.get2() == val;
+ assertEquals(val, (int)t.get1());
+ assertEquals(val, (int)t.get2());
}
}
}
@@ -499,16 +389,16 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
try {
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 20;
+ assertEquals(20, g.countDownLatch(STRUCTURE_NAME, 20, true, false).count());
- g.countDownLatch(STRUCTURE_NAME, 20, true, true).countDown(10);
+ g.countDownLatch(STRUCTURE_NAME, 20, true, false).countDown(10);
stopGrid(NEW_GRID_NAME);
- assert grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 10;
+ assertEquals(10, grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).count());
}
finally {
- grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).countDownAll();
+ grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).countDownAll();
}
}
}
@@ -517,6 +407,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreTopologyChange() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1977");
try (IgniteSemaphore semaphore = grid(0).semaphore(STRUCTURE_NAME, 20, true, true)) {
try {
@@ -541,6 +432,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreConstantTopologyChange() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1977");
+
try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, 10, false, true)) {
try {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@@ -595,6 +488,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreConstantTopologyChangeFailoverSafe() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1977");
+
try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true)) {
try {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@@ -656,6 +551,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreConstantMultipleTopologyChangeFailoverSafe() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1977");
+
final int numPermits = 3;
try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, numPermits, true, true)) {
@@ -728,6 +625,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreConstantTopologyChangeNotFailoverSafe() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1977");
+
try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, 1, false, true)) {
try {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@@ -788,105 +687,48 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testCountDownLatchConstantTopologyChange() throws Exception {
+ doTestCountDownLatch(new ConstantTopologyChangeWorker());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCountDownLatchConstantMultipleTopologyChange() throws Exception {
+ doTestCountDownLatch(multipleTopologyChangeWorker());
+ }
+
+ /**
+ * Tests distributed count down latch.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestCountDownLatch(ConstantTopologyChangeWorker topWorker) throws Exception {
try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) {
try {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(
+ new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assert ignite.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count() > 0;
- assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
+ return null;
}
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.count();
while (!fut.isDone()) {
- assert s.count() == val;
-
- assert s.countDown() == val - 1;
-
- val--;
+ assertEquals(val, s.count());
+ assertEquals(--val, s.countDown());
}
fut.get();
for (Ignite g : G.allGrids())
- assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count() == val;
+ assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count());
}
finally {
- grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).countDownAll();
- }
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCountDownLatchConstantMultipleTopologyChange() throws Exception {
- try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) {
- try {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
-
- names.add(name);
-
- Ignite g = startGrid(name);
-
- assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- int val = s.count();
-
- while (!fut.isDone()) {
- assert s.count() == val;
-
- assert s.countDown() == val - 1;
-
- val--;
- }
-
- fut.get();
-
- for (Ignite g : G.allGrids())
- assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count());
- }
- finally {
- grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll();
+ grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll();
}
}
}
@@ -900,13 +742,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).poll() == 10;
+ assertEquals(10, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).poll());
g.queue(STRUCTURE_NAME, 0, null).put(20);
stopGrid(NEW_GRID_NAME);
- assert grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek() == 20;
+ assertEquals(20, (int)grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek());
}
finally {
grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).close();
@@ -917,107 +759,138 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testQueueConstantTopologyChange() throws Exception {
+ doTestQueue(new ConstantTopologyChangeWorker());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueConstantMultipleTopologyChange() throws Exception {
+ doTestQueue(multipleTopologyChangeWorker());
+ }
+
+ /**
+ * Tests the queue.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestQueue(ConstantTopologyChangeWorker topWorker) throws Exception {
+ int queueMaxSize = 100;
+
try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
s.put(1);
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ IgniteQueue<Integer> queue = ignite.queue(STRUCTURE_NAME, 0, null);
- try {
- Ignite g = startGrid(name);
+ assertNotNull(queue);
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ Integer val = queue.peek();
+
+ assertNotNull(val);
+
+ assert val > 0;
+
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.peek();
- int origVal = val;
+ while (!fut.isDone()) {
+ if (s.size() == queueMaxSize) {
+ int last = 0;
+
+ for (int i = 0, size = s.size() - 1; i < size; i++) {
+ int cur = s.poll();
+
+ if (i == 0) {
+ last = cur;
+
+ continue;
+ }
+
+ assertEquals(last, cur - 1);
+
+ last = cur;
+ }
+ }
- while (!fut.isDone())
s.put(++val);
+ }
fut.get();
+ val = s.peek();
+
for (Ignite g : G.allGrids())
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal;
+ assertEquals(val, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).peek());
}
}
/**
* @throws Exception If failed.
*/
- public void testQueueConstantMultipleTopologyChange() throws Exception {
- try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
- s.put(1);
+ public void testAtomicSequenceInitialization() throws Exception {
+ int threadCnt = 3;
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
+ final AtomicInteger idx = new AtomicInteger(gridCount());
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ int id = idx.getAndIncrement();
- names.add(name);
+ try {
+ startGrid(id);
- Ignite g = startGrid(name);
+ Thread.sleep(1000);
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ catch (Exception e) {
+ throw F.wrap(e);
+ }
+ finally {
+ stopGrid(id);
- int val = s.peek();
+ info("Thread finished.");
+ }
+ }
+ }, threadCnt, "test-thread");
- int origVal = val;
+ while (!fut.isDone()) {
+ grid(0).compute().call(new IgniteCallable<Object>() {
+ /** */
+ @IgniteInstanceResource
+ private Ignite g;
- while (!fut.isDone())
- s.put(++val);
+ @Override public Object call() throws Exception {
+ IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
- fut.get();
+ assert seq != null;
- for (Ignite g : G.allGrids())
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal;
+ for (int i = 0; i < 1000; i++)
+ seq.getAndIncrement();
+
+ return null;
+ }
+ });
}
+
+ fut.get();
}
/**
* @throws Exception If failed.
*/
public void testAtomicSequenceTopologyChange() throws Exception {
- try (IgniteAtomicSequence s = grid().atomicSequence(STRUCTURE_NAME, 10, true)) {
+ try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 10, true)) {
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.atomicSequence(STRUCTURE_NAME, 10, false).get() == 1010;
+ assertEquals(1010, g.atomicSequence(STRUCTURE_NAME, 10, false).get());
- assert g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10) == 1020;
+ assertEquals(1020, g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10));
stopGrid(NEW_GRID_NAME);
}
@@ -1027,29 +900,31 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicSequenceConstantTopologyChange() throws Exception {
- try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- String name = UUID.randomUUID().toString();
+ doTestAtomicSequence(new ConstantTopologyChangeWorker());
+ }
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- try {
- Ignite g = startGrid(name);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
+ doTestAtomicSequence(multipleTopologyChangeWorker());
+ }
- assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ /**
+ * Tests atomic sequence.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicSequence(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assertTrue(ignite.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
+
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
long old = s.get();
@@ -1070,135 +945,228 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/**
* @throws Exception If failed.
*/
- public void testAtomicSequenceInitialization() throws Exception {
- int threadCnt = 3;
+ public void testUncommitedTxLeave() throws Exception {
+ final int val = 10;
- final AtomicInteger idx = new AtomicInteger(gridCount());
+ grid(0).atomicLong(STRUCTURE_NAME, val, true);
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- int id = idx.getAndIncrement();
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Ignite g = startGrid(NEW_GRID_NAME);
try {
- startGrid(id);
+ g.transactions().txStart();
- Thread.sleep(1000);
+ g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1);
- }
- catch (Exception e) {
- throw F.wrap(e);
+ assertEquals(val + 1, g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet());
}
finally {
- stopGrid(id);
-
- info("Thread finished.");
+ stopGrid(NEW_GRID_NAME);
}
+
+ return null;
}
- }, threadCnt, "test-thread");
+ }).get();
- while (!fut.isDone()) {
- grid(0).compute().call(new IgniteCallable<Object>() {
- /** */
- @IgniteInstanceResource
- private Ignite g;
+ waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()]));
- @Override public Object call() throws Exception {
- IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
+ assertEquals(val + 1, grid(0).atomicLong(STRUCTURE_NAME, val, false).get());
+ }
- assert seq != null;
+ /**
+ * @return Specific multiple topology change worker implementation.
+ */
+ private ConstantTopologyChangeWorker multipleTopologyChangeWorker() {
+ return collectionCacheMode() == CacheMode.PARTITIONED ? new PartitionedMultipleTopologyChangeWorker() :
+ new MultipleTopologyChangeWorker();
+ }
- for (int i = 0; i < 1000; i++)
- seq.getAndIncrement();
+ /**
+ *
+ */
+ private class ConstantTopologyChangeWorker {
+ /** */
+ protected final AtomicBoolean failed = new AtomicBoolean(false);
+
+ /**
+ * Starts changing cluster's topology.
+ *
+ * @return Future.
+ */
+ IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ try {
+ for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+ if (failed.get())
+ return;
- return null;
+ String name = UUID.randomUUID().toString();
+
+ try {
+ Ignite g = startGrid(name);
+
+ callback.apply(g);
+ }
+ finally {
+ if (i != TOP_CHANGE_CNT - 1)
+ stopGrid(name);
+ }
+ }
}
- });
- }
+ catch (Exception e) {
+ if (failed.compareAndSet(false, true))
+ throw F.wrap(e);
+ }
+ }
+ }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
- fut.get();
+ return fut;
+ }
}
/**
- * @throws Exception If failed.
+ *
*/
- public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
+ private class MultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
+ /**
+ * Starts changing cluster's topology.
+ *
+ * @return Future.
+ */
+ @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@Override public void apply() {
try {
for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+ if (failed.get())
+ return;
+
Collection<String> names = new GridLeanSet<>(3);
try {
for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
+ if (failed.get())
+ return;
- names.add(name);
+ String name = UUID.randomUUID().toString();
Ignite g = startGrid(name);
- assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
+ names.add(name);
+
+ callback.apply(g);
}
}
finally {
- if (i != TOP_CHANGE_CNT - 1)
+ if (i != TOP_CHANGE_CNT - 1) {
for (String name : names)
stopGrid(name);
+ }
}
}
}
catch (Exception e) {
- throw F.wrap(e);
+ if (failed.compareAndSet(false, true))
+ throw F.wrap(e);
}
}
}, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
- long old = s.get();
-
- while (!fut.isDone()) {
- assertEquals(old, s.get());
-
- long val = s.incrementAndGet();
-
- assertTrue(val > old);
-
- old = val;
- }
-
- fut.get();
+ return fut;
}
}
/**
- * @throws Exception If failed.
+ *
*/
- public void testUncommitedTxLeave() throws Exception {
- final int val = 10;
+ private class PartitionedMultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
+ /** */
+ private CyclicBarrier barrier;
+
+ /**
+ * Starts changing cluster's topology.
+ *
+ * @return Future.
+ */
+ @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
+ final Semaphore sem = new Semaphore(TOP_CHANGE_THREAD_CNT);
+
+ final ConcurrentSkipListSet<String> startedNodes = new ConcurrentSkipListSet<>();
+
+ barrier = new CyclicBarrier(TOP_CHANGE_THREAD_CNT, new Runnable() {
+ @Override public void run() {
+ try {
+ assertEquals(TOP_CHANGE_THREAD_CNT * 3, startedNodes.size());
- grid(0).atomicLong(STRUCTURE_NAME, val, true);
+ for (String name : startedNodes) {
+ stopGrid(name, false);
- GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Ignite g = startGrid(NEW_GRID_NAME);
+ awaitPartitionMapExchange();
+ }
- try {
- g.transactions().txStart();
+ startedNodes.clear();
+ sem.release(TOP_CHANGE_THREAD_CNT);
- g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1);
+ barrier.reset();
+ }
+ catch (Exception e) {
+ if (failed.compareAndSet(false, true)) {
+ sem.release(TOP_CHANGE_THREAD_CNT);
- assert g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet() == val + 1;
- }
- finally {
- stopGrid(NEW_GRID_NAME);
+ barrier.reset();
+
+ throw F.wrap(e);
+ }
+ }
}
+ });
- return null;
- }
- }).get();
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ try {
+ for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+ sem.acquire();
- waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()]));
+ if (failed.get())
+ return;
+
+ for (int j = 0; j < 3; j++) {
+ if (failed.get())
+ return;
+
+ String name = UUID.randomUUID().toString();
+
+ startedNodes.add(name);
- assert grid(0).atomicLong(STRUCTURE_NAME, val, false).get() == val + 1;
+ Ignite g = startGrid(name);
+
+ callback.apply(g);
+ }
+
+ try {
+ barrier.await();
+ }
+ catch (BrokenBarrierException e) {
+ // Ignore.
+ }
+ }
+ }
+ catch (Exception e) {
+ if (failed.compareAndSet(false, true)) {
+ sem.release(TOP_CHANGE_THREAD_CNT);
+
+ barrier.reset();
+
+ throw F.wrap(e);
+ }
+ }
+ }
+ }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+
+ return fut;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
index 18b0b21..6c880a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
@@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
public class GridCachePartitionedDataStructuresFailoverSelfTest
extends GridCacheAbstractDataStructuresFailoverSelfTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-803");
- }
-
- /** {@inheritDoc} */
@Override protected CacheMode collectionCacheMode() {
return PARTITIONED;
}
@@ -50,4 +45,4 @@ public class GridCachePartitionedDataStructuresFailoverSelfTest
@Override protected CacheAtomicityMode collectionCacheAtomicityMode() {
return TRANSACTIONAL;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
index 86b763a..b3ded7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
@@ -24,16 +24,10 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
/**
* Failover tests for cache data structures.
*/
-public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest extends GridCachePartitionedDataStructuresFailoverSelfTest {
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-803");
- }
-
+public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest
+ extends GridCachePartitionedDataStructuresFailoverSelfTest {
/** {@inheritDoc} */
@Override protected CacheMemoryMode collectionMemoryMode() {
return OFFHEAP_TIERED;
}
-
-
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
index d0131d6..28ce901 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
@@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
public class GridCacheReplicatedDataStructuresFailoverSelfTest
extends GridCacheAbstractDataStructuresFailoverSelfTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-801");
- }
-
- /** {@inheritDoc} */
@Override protected CacheMode collectionCacheMode() {
return REPLICATED;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
index 19daa26..c00557d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -150,6 +150,8 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
* @throws Exception If failed.
*/
public void testClientQueueCreateCloseFailover() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1976");
+
testFailoverWithClient(new IgniteInClosure<Ignite>() {
@Override public void apply(Ignite ignite) {
for (int i = 0; i < 100; i++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index caca2ca..94dc665 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -180,7 +180,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
* @return Timeout.
*/
protected long awaitForSocketWriteTimeout() {
- return 5000;
+ return 8000;
}
/**
@@ -742,4 +742,4 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
nodes.clear();
spiRsrcs.clear();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 344efc0..6b20b2a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -273,7 +273,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
}
try {
- assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS));
+ assertTrue(latch.await(failureThreshold + 3000, TimeUnit.MILLISECONDS));
assertFalse("Unexpected event, see log for details.", err.get());
assertEquals(nodeId, client.cluster().localNode().id());
@@ -331,4 +331,4 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
err = null;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 379a3a6..42960e7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -373,6 +373,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
private void testFailureDetectionOnNodePing(Ignite pingingNode, Ignite failedNode) throws Exception {
final CountDownLatch cnt = new CountDownLatch(1);
+ final UUID failedNodeId = failedNode.cluster().localNode().id();
+
pingingNode.events().localListen(
new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
@@ -390,9 +392,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
TcpDiscoverySpi spi = discoMap.get(pingingNode.name());
- boolean res = spi.pingNode(failedNode.cluster().localNode().id());
+ boolean res = spi.pingNode(failedNodeId);
- assertFalse("Ping is ok for node " + failedNode.cluster().localNode().id() + ", but had to fail.", res);
+ assertFalse("Ping is ok for node " + failedNodeId + ", but had to fail.", res);
// Heartbeat interval is 40 seconds, but we should detect node failure faster.
assert cnt.await(7, SECONDS);
@@ -409,6 +411,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse = true;
+ final UUID failedNodeId = failedNode.cluster().localNode().id();
+
final CountDownLatch pingLatch = new CountDownLatch(1);
final CountDownLatch eventLatch = new CountDownLatch(1);
@@ -422,7 +426,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
pingingNode.events().localListen(
new IgnitePredicate<Event>() {
@Override public boolean apply(Event event) {
- if (((DiscoveryEvent)event).eventNode().id().equals(failedNode.cluster().localNode().id())) {
+ if (((DiscoveryEvent)event).eventNode().id().equals(failedNodeId)) {
failRes.set(true);
eventLatch.countDown();
}
@@ -438,7 +442,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
pingLatch.countDown();
pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode(
- failedNode.cluster().localNode().id()));
+ failedNodeId));
return null;
}
@@ -1166,7 +1170,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
for (IgniteKernal grid : grids)
assertEquals(startTime, (Long)grid.context().discovery().gridStartTime());
- grids.add((IgniteKernal) startGrid(5));
+ grids.add((IgniteKernal)startGrid(5));
for (IgniteKernal grid : grids)
assertEquals(startTime, (Long)grid.context().discovery().gridStartTime());
@@ -1326,6 +1330,61 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed
+ */
+ public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception {
+ try {
+ TestMessageWorkerFailureSpi spi0 = new TestMessageWorkerFailureSpi();
+
+ nodeSpi.set(spi0);
+
+ final Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TcpDiscoverySpi());
+
+ Ignite ignite1 = startGrid(1);
+
+ final AtomicBoolean disconnected = new AtomicBoolean();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final UUID failedNodeId = ignite0.cluster().localNode().id();
+
+ ignite1.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event event) {
+ if (event.type() == EventType.EVT_NODE_FAILED &&
+ failedNodeId.equals(((DiscoveryEvent)event).eventNode().id()))
+ disconnected.set(true);
+
+ latch.countDown();
+
+ return false;
+ }
+ }, EventType.EVT_NODE_FAILED);
+
+ spi0.stop = true;
+
+ latch.await(15, TimeUnit.SECONDS);
+
+ assertTrue(disconnected.get());
+
+ try {
+ ignite0.cluster().localNode().id();
+ }
+ catch (IllegalStateException e) {
+ if (e.getMessage().contains("Grid is in invalid state to perform this operation"))
+ return;
+ }
+
+ fail();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+
+ /**
* @param twoNodes If {@code true} starts two nodes, otherwise three.
* @throws Exception If failed
*/
@@ -1891,6 +1950,25 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ private static class TestMessageWorkerFailureSpi extends TcpDiscoverySpi {
+ /** */
+ private volatile boolean stop;
+
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+
+ if (stop)
+ throw new RuntimeException("Failing ring message worker explicitly");
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+ }
+
+ /**
* Starts new grid with given index. Method optimize is not invoked.
*
* @param idx Index of the grid to start.
@@ -1911,4 +1989,4 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
private Ignite startGridNoOptimize(String gridName) throws Exception {
return G.start(getConfiguration(gridName));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 6f9c559..1fd4cb1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -856,7 +856,7 @@ public abstract class GridAbstractTest extends TestCase {
List<Ignite> ignites = G.allGrids();
for (Ignite g : ignites) {
- if (g.cluster().localNode().isClient())
+ if (g.configuration().getDiscoverySpi().isClientMode())
stopGrid(g.name(), cancel);
}
}
@@ -868,7 +868,7 @@ public abstract class GridAbstractTest extends TestCase {
List<Ignite> ignites = G.allGrids();
for (Ignite g : ignites) {
- if (!g.cluster().localNode().isClient())
+ if (!g.configuration().getDiscoverySpi().isClientMode())
stopGrid(g.name(), cancel);
}
}
@@ -2065,4 +2065,4 @@ public abstract class GridAbstractTest extends TestCase {
*/
public abstract void run(Ignite ignite, IgniteCache<K, V> cache) throws Exception;
}
-}
\ No newline at end of file
+}