You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/14 12:33:25 UTC
ignite git commit: ignite-801: properly processing cluster topology
exception when atomic stamped value is being modified. Finished refactoring
the tests.
Repository: ignite
Updated Branches:
refs/heads/ignite-801 de632ace6 -> df42d4a94
ignite-801: properly processing cluster topology exception when atomic stamped value is being modified. Finished refactoring the tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/df42d4a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/df42d4a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/df42d4a9
Branch: refs/heads/ignite-801
Commit: df42d4a944178999594fd79fbe0ce45db4596442
Parents: de632ac
Author: Denis Magda <dm...@gridgain.com>
Authored: Mon Sep 14 13:33:16 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Mon Sep 14 13:33:16 2015 +0300
----------------------------------------------------------------------
.../datastructures/DataStructuresProcessor.java | 23 +-
.../GridCacheCountDownLatchImpl.java | 1 +
...eAbstractDataStructuresFailoverSelfTest.java | 754 +++++++------------
...edOffheapDataStructuresFailoverSelfTest.java | 2 -
...eplicatedDataStructuresFailoverSelfTest.java | 5 -
5 files changed, 286 insertions(+), 499 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index a5561e9..ef66635 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -489,21 +489,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (dataStructure != null)
return dataStructure;
- if (!create)
- return c.applyx();
-
while (true) {
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+ try {
+ if (!create)
+ return c.applyx();
- if (err != null)
- throw err;
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
- dataStructure = c.applyx();
+ if (err != null)
+ throw err;
- tx.commit();
+ dataStructure = c.applyx();
+
+ tx.commit();
- return dataStructure;
+ return dataStructure;
+ }
}
catch (ClusterTopologyCheckedException e) {
IgniteInternalFuture<?> fut = e.retryReadyFuture();
@@ -513,6 +515,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
catch (IgniteTxRollbackCheckedException ignore) {
// Safe to retry right away.
}
+
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index cdd5f90..01d8c58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -344,6 +344,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
@Override public Integer call() throws Exception {
Integer val;
+ //REMOVE TR
try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheCountDownLatchValue latchVal = latchView.get(key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 2fd40f6..0b12d63 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.datastructures;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicReference;
@@ -38,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
@@ -65,6 +68,15 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/** */
private static final int TOP_CHANGE_THREAD_CNT = 3;
+ /** */
+ private static final int TOP_CHANGED_ERR_RETRY_CNT = 5;
+
+ /** */
+ private static final long TOP_CHANGED_ERR_RETRY_TIMEOUT = 3000;
+
+ /** */
+ private static final long READY_FUTURE_WAIT_TIMEOUT = 10_000;
+
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return TEST_TIMEOUT;
@@ -127,13 +139,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) {
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.atomicLong(STRUCTURE_NAME, 10, true).get() == 10;
+ assertEquals(10, g.atomicLong(STRUCTURE_NAME, 10, false).get());
- assert g.atomicLong(STRUCTURE_NAME, 10, true).addAndGet(10) == 20;
+ assertEquals(20, g.atomicLong(STRUCTURE_NAME, 10, false).addAndGet(10));
stopGrid(NEW_GRID_NAME);
- assert grid(0).atomicLong(STRUCTURE_NAME, 10, true).get() == 20;
+ assertEquals(20, grid(0).atomicLong(STRUCTURE_NAME, 10, true).get());
}
}
@@ -141,97 +153,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicLongConstantTopologyChange() throws Exception {
- try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override
- public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
-
- assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- long val = s.get();
-
- while (!fut.isDone()) {
- assert s.get() == val;
-
- assert s.incrementAndGet() == val + 1;
-
- val++;
- }
-
- fut.get();
-
- for (Ignite g : G.allGrids())
- assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get());
- }
+ doTestAtomicLong(new ConstantTopologyChangeWorker());
}
/**
* @throws Exception If failed.
*/
public void testAtomicLongConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
-
- names.add(name);
+ doTestAtomicLong(new ConstantMultipleTopologyChangeWorker());
+ }
- Ignite g = startGrid(name);
+ /**
+ * Tests IgniteAtomicLong.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicLong(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assert ignite.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
- assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
long val = s.get();
while (!fut.isDone()) {
- assert s.get() == val;
+ assertEquals(val, s.get());
- assert s.incrementAndGet() == val + 1;
-
- val++;
+ assertEquals(++val, s.incrementAndGet());
}
fut.get();
for (Ignite g : G.allGrids())
- assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get());
+ assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, false).get());
}
}
@@ -242,13 +201,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
try (IgniteAtomicReference atomic = grid(0).atomicReference(STRUCTURE_NAME, 10, true)) {
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.atomicReference(STRUCTURE_NAME, 10, true).get() == 10;
+ assertEquals((Integer)10, g.atomicReference(STRUCTURE_NAME, 10, false).get());
- g.atomicReference(STRUCTURE_NAME, 10, true).set(20);
+ g.atomicReference(STRUCTURE_NAME, 10, false).set(20);
stopGrid(NEW_GRID_NAME);
- assertEquals(20, (int) grid(0).atomicReference(STRUCTURE_NAME, 10, true).get());
+ assertEquals((Integer)20, grid(0).atomicReference(STRUCTURE_NAME, 10, true).get());
}
}
@@ -256,85 +215,36 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicReferenceConstantTopologyChange() throws Exception {
- try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override
- public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
-
- assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- int val = s.get();
-
- while (!fut.isDone()) {
- assert s.get() == val;
-
- s.set(++val);
- }
-
- fut.get();
-
- for (Ignite g : G.allGrids())
- assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get());
- }
+ doTestAtomicReference(new ConstantTopologyChangeWorker());
}
/**
* @throws Exception If failed.
*/
public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
-
- names.add(name);
+ doTestAtomicReference(new ConstantMultipleTopologyChangeWorker());
+ }
- Ignite g = startGrid(name);
+ /**
+ * Tests atomic reference.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicReference(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assert ignite.atomicReference(STRUCTURE_NAME, 1, false).get() > 0;
- assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.get();
while (!fut.isDone()) {
- assert s.get() == val;
+ assertEquals(val, (int)s.get());
s.set(++val);
}
@@ -342,7 +252,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
fut.get();
for (Ignite g : G.allGrids())
- assert g.atomicReference(STRUCTURE_NAME, 1, true).get() == val;
+ assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get());
}
}
@@ -353,19 +263,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
try (IgniteAtomicStamped atomic = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true)) {
Ignite g = startGrid(NEW_GRID_NAME);
- IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, true).get();
+ IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, false).get();
- assert t.get1() == 10;
- assert t.get2() == 10;
+ assertEquals((Integer)10, t.get1());
+ assertEquals((Integer)10, t.get2());
- g.atomicStamped(STRUCTURE_NAME, 10, 10, true).set(20, 20);
+ g.atomicStamped(STRUCTURE_NAME, 10, 10, false).set(20, 20);
stopGrid(NEW_GRID_NAME);
- t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true).get();
+ t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, false).get();
- assert t.get1() == 20;
- assert t.get2() == 20;
+ assertEquals((Integer)20, t.get1());
+ assertEquals((Integer)20, t.get2());
}
}
@@ -373,107 +283,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicStampedConstantTopologyChange() throws Exception {
- try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override
- public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
-
- IgniteBiTuple<Integer, Integer> t =
- g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
-
- assert t.get1() > 0;
- assert t.get2() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- int val = s.value();
-
- while (!fut.isDone()) {
- IgniteBiTuple<Integer, Integer> t = s.get();
-
- assert t.get1() == val;
- assert t.get2() == val;
-
- val++;
-
- s.set(val, val);
- }
-
- fut.get();
-
- for (Ignite g : G.allGrids()) {
- IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
-
- assert t.get1() == val;
- assert t.get2() == val;
- }
- }
+ doTestAtomicStamped(new ConstantTopologyChangeWorker());
}
/**
* @throws Exception If failed.
*/
public void testAtomicStampedConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
-
- names.add(name);
+ doTestAtomicStamped(new ConstantMultipleTopologyChangeWorker());
+ }
- Ignite g = startGrid(name);
+ /**
+ * Tests atomic stamped value.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicStamped(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ IgniteBiTuple<Integer, Integer> t = ignite.atomicStamped(STRUCTURE_NAME, 1, 1, false).get();
- IgniteBiTuple<Integer, Integer> t =
- g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
+ assert t.get1() > 0;
+ assert t.get2() > 0;
- assert t.get1() > 0;
- assert t.get2() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.value();
while (!fut.isDone()) {
IgniteBiTuple<Integer, Integer> t = s.get();
- assert t.get1() == val;
- assert t.get2() == val;
+ assertEquals(val, (int)t.get1());
+ assertEquals(val, (int)t.get2());
- val++;
+ ++val;
s.set(val, val);
}
@@ -481,10 +328,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
fut.get();
for (Ignite g : G.allGrids()) {
- IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
+ IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, false).get();
- assert t.get1() == val;
- assert t.get2() == val;
+ assertEquals(val, (int)t.get1());
+ assertEquals(val, (int)t.get2());
}
}
}
@@ -497,16 +344,16 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
try {
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 20;
+ assertEquals(20, g.countDownLatch(STRUCTURE_NAME, 20, true, false).count());
- g.countDownLatch(STRUCTURE_NAME, 20, true, true).countDown(10);
+ g.countDownLatch(STRUCTURE_NAME, 20, true, false).countDown(10);
stopGrid(NEW_GRID_NAME);
- assert grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 10;
+ assertEquals(10, grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).count());
}
finally {
- grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).countDownAll();
+ grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).countDownAll();
}
}
}
@@ -515,102 +362,45 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testCountDownLatchConstantTopologyChange() throws Exception {
- try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) {
- try {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
-
- assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- int val = s.count();
-
- while (!fut.isDone()) {
- assert s.count() == val;
-
- assert s.countDown() == val - 1;
-
- val--;
- }
-
- fut.get();
-
- for (Ignite g : G.allGrids())
- assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count() == val;
- }
- finally {
- grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).countDownAll();
- }
- }
+ doTestCountDownLatch(new ConstantTopologyChangeWorker());
}
/**
* @throws Exception If failed.
*/
public void testCountDownLatchConstantMultipleTopologyChange() throws Exception {
+ doTestCountDownLatch(new ConstantMultipleTopologyChangeWorker());
+ }
+
+ /**
+ * Tests distributed count down latch.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestCountDownLatch(ConstantTopologyChangeWorker topWorker) throws Exception {
try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) {
try {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
-
- names.add(name);
-
- Ignite g = startGrid(name);
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(
+ new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assert ignite.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count() > 0;
- assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
+ return null;
}
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.count();
while (!fut.isDone()) {
- assert s.count() == val;
-
- assert s.countDown() == val - 1;
-
- val--;
+ assertEquals(val, s.count());
+ assertEquals(--val, s.countDown());
}
fut.get();
for (Ignite g : G.allGrids())
- assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count());
+ assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count());
}
finally {
grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll();
@@ -627,13 +417,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).poll() == 10;
+ assertEquals(10, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).poll());
g.queue(STRUCTURE_NAME, 0, null).put(20);
stopGrid(NEW_GRID_NAME);
- assert grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek() == 20;
+ assertEquals(20, (int)grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek());
}
finally {
grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).close();
@@ -644,31 +434,33 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testQueueConstantTopologyChange() throws Exception {
+ doTestQueue(new ConstantTopologyChangeWorker());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueConstantMultipleTopologyChange() throws Exception {
+ doTestQueue(new ConstantMultipleTopologyChangeWorker());
+ }
+
+ /**
+ * Tests the queue.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestQueue(ConstantTopologyChangeWorker topWorker) throws Exception {
try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
s.put(1);
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assert ignite.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.peek();
@@ -680,71 +472,71 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
fut.get();
for (Ignite g : G.allGrids())
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal;
+ assertEquals(origVal, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).peek());
}
}
/**
* @throws Exception If failed.
*/
- public void testQueueConstantMultipleTopologyChange() throws Exception {
- try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
- s.put(1);
+ public void testAtomicSequenceInitialization() throws Exception {
+ int threadCnt = 3;
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
+ final AtomicInteger idx = new AtomicInteger(gridCount());
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ int id = idx.getAndIncrement();
- names.add(name);
+ try {
+ startGrid(id);
- Ignite g = startGrid(name);
+ Thread.sleep(1000);
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ catch (Exception e) {
+ throw F.wrap(e);
+ }
+ finally {
+ stopGrid(id);
- int val = s.peek();
+ info("Thread finished.");
+ }
+ }
+ }, threadCnt, "test-thread");
- int origVal = val;
+ while (!fut.isDone()) {
+ grid(0).compute().call(new IgniteCallable<Object>() {
+ /** */
+ @IgniteInstanceResource
+ private Ignite g;
- while (!fut.isDone())
- s.put(++val);
+ @Override public Object call() throws Exception {
+ IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
- fut.get();
+ assert seq != null;
- for (Ignite g : G.allGrids())
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal;
+ for (int i = 0; i < 1000; i++)
+ seq.getAndIncrement();
+
+ return null;
+ }
+ });
}
+
+ fut.get();
}
/**
* @throws Exception If failed.
*/
public void testAtomicSequenceTopologyChange() throws Exception {
- try (IgniteAtomicSequence s = grid().atomicSequence(STRUCTURE_NAME, 10, true)) {
+ try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 10, true)) {
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.atomicSequence(STRUCTURE_NAME, 10, false).get() == 1010;
+ assertEquals(1010, g.atomicSequence(STRUCTURE_NAME, 10, false).get());
- assert g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10) == 1020;
+ assertEquals(1020, g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10));
stopGrid(NEW_GRID_NAME);
}
@@ -754,29 +546,31 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicSequenceConstantTopologyChange() throws Exception {
- try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- String name = UUID.randomUUID().toString();
+ doTestAtomicSequence(new ConstantTopologyChangeWorker());
+ }
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- try {
- Ignite g = startGrid(name);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
+ doTestAtomicSequence(new ConstantMultipleTopologyChangeWorker());
+ }
- assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ /**
+ * Tests atomic sequence.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicSequence(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assertTrue(ignite.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
+
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
long old = s.get();
@@ -797,135 +591,131 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/**
* @throws Exception If failed.
*/
- public void testAtomicSequenceInitialization() throws Exception {
- int threadCnt = 3;
+ public void testUncommitedTxLeave() throws Exception {
+ final int val = 10;
- final AtomicInteger idx = new AtomicInteger(gridCount());
+ grid(0).atomicLong(STRUCTURE_NAME, val, true);
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- int id = idx.getAndIncrement();
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Ignite g = startGrid(NEW_GRID_NAME);
try {
- startGrid(id);
+ g.transactions().txStart();
- Thread.sleep(1000);
+ g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1);
- }
- catch (Exception e) {
- throw F.wrap(e);
+ assertEquals(val + 1, g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet());
}
finally {
- stopGrid(id);
-
- info("Thread finished.");
+ stopGrid(NEW_GRID_NAME);
}
- }
- }, threadCnt, "test-thread");
-
- while (!fut.isDone()) {
- grid(0).compute().call(new IgniteCallable<Object>() {
- /** */
- @IgniteInstanceResource
- private Ignite g;
-
- @Override public Object call() throws Exception {
- IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
- assert seq != null;
-
- for (int i = 0; i < 1000; i++)
- seq.getAndIncrement();
+ return null;
+ }
+ }).get();
- return null;
- }
- });
- }
+ waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()]));
- fut.get();
+ assertEquals(val + 1, grid(0).atomicLong(STRUCTURE_NAME, val, false).get());
}
/**
- * @throws Exception If failed.
+ *
*/
- public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
+ private class ConstantTopologyChangeWorker {
+ /** */
+ protected final AtomicBoolean failed = new AtomicBoolean(false);
+
+ /**
+ * Starts changing cluster's topology.
+ *
+ * @return Future.
+ */
+ IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@Override public void apply() {
try {
for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
+ if (failed.get())
+ return;
- names.add(name);
+ String name = UUID.randomUUID().toString();
- Ignite g = startGrid(name);
+ try {
+ Ignite g = startGrid(name);
- assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
- }
+ callback.apply(g);
}
finally {
if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
+ stopGrid(name);
}
}
}
catch (Exception e) {
+ failed.set(true);
+
throw F.wrap(e);
}
}
}, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
- long old = s.get();
-
- while (!fut.isDone()) {
- assertEquals(old, s.get());
-
- long val = s.incrementAndGet();
-
- assertTrue(val > old);
-
- old = val;
- }
-
- fut.get();
+ return fut;
}
}
/**
- * @throws Exception If failed.
+ *
*/
- public void testUncommitedTxLeave() throws Exception {
- final int val = 10;
+ private class ConstantMultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
+ /**
+ * Starts changing cluster's topology.
+ *
+ * @return Future.
+ */
+ @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ try {
+ for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+ if (failed.get())
+ return;
- grid(0).atomicLong(STRUCTURE_NAME, val, true);
+ Collection<String> names = new GridLeanSet<>(3);
- GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Ignite g = startGrid(NEW_GRID_NAME);
+ try {
+ for (int j = 0; j < 3; j++) {
+ if (failed.get())
+ return;
- try {
- g.transactions().txStart();
+ String name = UUID.randomUUID().toString();
+ Ignite g = startGrid(name);
- g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1);
+ names.add(name);
- assert g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet() == val + 1;
- }
- finally {
- stopGrid(NEW_GRID_NAME);
- }
+ callback.apply(g);
+ }
+ }
+ finally {
+ if (i != TOP_CHANGE_CNT - 1) {
- return null;
- }
- }).get();
+ for (String name : names)
+ stopGrid(name);
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ failed.set(true);
- waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()]));
+ throw F.wrap(e);
+ }
+ }
+ }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
- assert grid(0).atomicLong(STRUCTURE_NAME, val, false).get() == val + 1;
+ return fut;
+ }
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
index 86b763a..a9cd470 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
@@ -34,6 +34,4 @@ public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest extends G
@Override protected CacheMemoryMode collectionMemoryMode() {
return OFFHEAP_TIERED;
}
-
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/df42d4a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
index 69de7cd..902ba44 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
@@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
public class GridCacheReplicatedDataStructuresFailoverSelfTest
extends GridCacheAbstractDataStructuresFailoverSelfTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-801");
- }
-
- /** {@inheritDoc} */
@Override protected CacheMode collectionCacheMode() {
return REPLICATED;
}