You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/11/20 17:30:06 UTC
[1/3] ignite git commit: ignite-801 and ignite-1911: resurrecting
data structure and atomics failover tests + stopping the node if ring message
worker fails
Repository: ignite
Updated Branches:
refs/heads/ignite-1.5 ab8298afe -> f89347f0e
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 85a26ad..bc11448 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -18,8 +18,15 @@
package org.apache.ignite.internal.processors.cache.datastructures;
import java.util.Collection;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicLong;
@@ -27,20 +34,27 @@ import org.apache.ignite.IgniteAtomicReference;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.typedef.CA;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -50,7 +64,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
*/
public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends IgniteCollectionAbstractTest {
/** */
- private static final long TEST_TIMEOUT = 2 * 60 * 1000;
+ private static final long TEST_TIMEOUT = 3 * 60 * 1000;
/** */
private static final String NEW_GRID_NAME = "newGrid";
@@ -67,6 +81,9 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/** */
private static final int TOP_CHANGE_THREAD_CNT = 3;
+ /** */
+ private boolean client;
+
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return TEST_TIMEOUT;
@@ -119,121 +136,106 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
cfg.setCacheConfiguration(ccfg);
+ if (client) {
+ cfg.setClientMode(client);
+ ((TcpDiscoverySpi)(cfg.getDiscoverySpi())).setForceServerMode(true);
+ }
+
return cfg;
}
/**
* @throws Exception If failed.
*/
- public void testAtomicLongTopologyChange() throws Exception {
- try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) {
- Ignite g = startGrid(NEW_GRID_NAME);
+ public void testAtomicLongFailsWhenServersLeft() throws Exception {
+ client = true;
+
+ Ignite ignite = startGrid(gridCount());
- assert g.atomicLong(STRUCTURE_NAME, 10, true).get() == 10;
+ new Timer().schedule(new TimerTask() {
+ @Override public void run() {
+ for (int i = 0; i < gridCount(); i++)
+ stopGrid(i);
+ }
+ }, 10_000);
- assert g.atomicLong(STRUCTURE_NAME, 10, true).addAndGet(10) == 20;
+ long stopTime = U.currentTimeMillis() + TEST_TIMEOUT / 2;
- stopGrid(NEW_GRID_NAME);
+ IgniteAtomicLong atomic = ignite.atomicLong(STRUCTURE_NAME, 10, true);
+
+ try {
+ while (U.currentTimeMillis() < stopTime)
+ assertEquals(10, atomic.get());
+ }
+ catch (IgniteException e) {
+ if (X.hasCause(e, ClusterTopologyServerNotFoundException.class))
+ return;
- assert grid(0).atomicLong(STRUCTURE_NAME, 10, true).get() == 20;
+ throw e;
}
+
+ fail();
}
/**
* @throws Exception If failed.
*/
- public void testAtomicLongConstantTopologyChange() throws Exception {
- try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override
- public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
-
- assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- long val = s.get();
-
- while (!fut.isDone()) {
- assert s.get() == val;
+ public void testAtomicLongTopologyChange() throws Exception {
+ try (IgniteAtomicLong atomic = grid(0).atomicLong(STRUCTURE_NAME, 10, true)) {
+ Ignite g = startGrid(NEW_GRID_NAME);
- assert s.incrementAndGet() == val + 1;
+ assertEquals(10, g.atomicLong(STRUCTURE_NAME, 10, false).get());
- val++;
- }
+ assertEquals(20, g.atomicLong(STRUCTURE_NAME, 10, false).addAndGet(10));
- fut.get();
+ stopGrid(NEW_GRID_NAME);
- for (Ignite g : G.allGrids())
- assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get());
+ assertEquals(20, grid(0).atomicLong(STRUCTURE_NAME, 10, true).get());
}
}
/**
* @throws Exception If failed.
*/
- public void testAtomicLongConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
+ public void testAtomicLongConstantTopologyChange() throws Exception {
+ doTestAtomicLong(new ConstantTopologyChangeWorker());
+ }
- names.add(name);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicLongConstantMultipleTopologyChange() throws Exception {
+ doTestAtomicLong(multipleTopologyChangeWorker());
+ }
- Ignite g = startGrid(name);
+ /**
+ * Tests IgniteAtomicLong.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicLong(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicLong s = grid(0).atomicLong(STRUCTURE_NAME, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assert ignite.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
- assert g.atomicLong(STRUCTURE_NAME, 1, true).get() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
long val = s.get();
while (!fut.isDone()) {
- assert s.get() == val;
-
- assert s.incrementAndGet() == val + 1;
+ assertEquals(val, s.get());
- val++;
+ assertEquals(++val, s.incrementAndGet());
}
fut.get();
for (Ignite g : G.allGrids())
- assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, true).get());
+ assertEquals(val, g.atomicLong(STRUCTURE_NAME, 1, false).get());
}
}
@@ -244,13 +246,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
try (IgniteAtomicReference atomic = grid(0).atomicReference(STRUCTURE_NAME, 10, true)) {
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.atomicReference(STRUCTURE_NAME, 10, true).get() == 10;
+ assertEquals((Integer)10, g.atomicReference(STRUCTURE_NAME, 10, false).get());
- g.atomicReference(STRUCTURE_NAME, 10, true).set(20);
+ g.atomicReference(STRUCTURE_NAME, 10, false).set(20);
stopGrid(NEW_GRID_NAME);
- assertEquals(20, (int) grid(0).atomicReference(STRUCTURE_NAME, 10, true).get());
+ assertEquals((Integer)20, grid(0).atomicReference(STRUCTURE_NAME, 10, true).get());
}
}
@@ -258,85 +260,36 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicReferenceConstantTopologyChange() throws Exception {
- try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override
- public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
-
- assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- int val = s.get();
-
- while (!fut.isDone()) {
- assert s.get() == val;
-
- s.set(++val);
- }
-
- fut.get();
-
- for (Ignite g : G.allGrids())
- assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get());
- }
+ doTestAtomicReference(new ConstantTopologyChangeWorker());
}
/**
* @throws Exception If failed.
*/
public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
-
- names.add(name);
+ doTestAtomicReference(multipleTopologyChangeWorker());
+ }
- Ignite g = startGrid(name);
+ /**
+ * Tests atomic reference.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicReference(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicReference<Integer> s = grid(0).atomicReference(STRUCTURE_NAME, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assert ignite.atomicReference(STRUCTURE_NAME, 1, false).get() > 0;
- assert g.atomicReference(STRUCTURE_NAME, 1, true).get() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.get();
while (!fut.isDone()) {
- assert s.get() == val;
+ assertEquals(val, (int)s.get());
s.set(++val);
}
@@ -344,7 +297,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
fut.get();
for (Ignite g : G.allGrids())
- assert g.atomicReference(STRUCTURE_NAME, 1, true).get() == val;
+ assertEquals(val, (int)g.atomicReference(STRUCTURE_NAME, 1, true).get());
}
}
@@ -355,19 +308,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
try (IgniteAtomicStamped atomic = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true)) {
Ignite g = startGrid(NEW_GRID_NAME);
- IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, true).get();
+ IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 10, 10, false).get();
- assert t.get1() == 10;
- assert t.get2() == 10;
+ assertEquals((Integer)10, t.get1());
+ assertEquals((Integer)10, t.get2());
- g.atomicStamped(STRUCTURE_NAME, 10, 10, true).set(20, 20);
+ g.atomicStamped(STRUCTURE_NAME, 10, 10, false).set(20, 20);
stopGrid(NEW_GRID_NAME);
- t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, true).get();
+ t = grid(0).atomicStamped(STRUCTURE_NAME, 10, 10, false).get();
- assert t.get1() == 20;
- assert t.get2() == 20;
+ assertEquals((Integer)20, t.get1());
+ assertEquals((Integer)20, t.get2());
}
}
@@ -375,107 +328,44 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicStampedConstantTopologyChange() throws Exception {
- try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override
- public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
-
- IgniteBiTuple<Integer, Integer> t =
- g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
-
- assert t.get1() > 0;
- assert t.get2() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- int val = s.value();
-
- while (!fut.isDone()) {
- IgniteBiTuple<Integer, Integer> t = s.get();
-
- assert t.get1() == val;
- assert t.get2() == val;
-
- val++;
-
- s.set(val, val);
- }
-
- fut.get();
-
- for (Ignite g : G.allGrids()) {
- IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
-
- assert t.get1() == val;
- assert t.get2() == val;
- }
- }
+ doTestAtomicStamped(new ConstantTopologyChangeWorker());
}
/**
* @throws Exception If failed.
*/
public void testAtomicStampedConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
-
- names.add(name);
+ doTestAtomicStamped(multipleTopologyChangeWorker());
+ }
- Ignite g = startGrid(name);
+ /**
+ * Tests atomic stamped value.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicStamped(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicStamped<Integer, Integer> s = grid(0).atomicStamped(STRUCTURE_NAME, 1, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ IgniteBiTuple<Integer, Integer> t = ignite.atomicStamped(STRUCTURE_NAME, 1, 1, false).get();
- IgniteBiTuple<Integer, Integer> t =
- g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
+ assert t.get1() > 0;
+ assert t.get2() > 0;
- assert t.get1() > 0;
- assert t.get2() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.value();
while (!fut.isDone()) {
IgniteBiTuple<Integer, Integer> t = s.get();
- assert t.get1() == val;
- assert t.get2() == val;
+ assertEquals(val, (int)t.get1());
+ assertEquals(val, (int)t.get2());
- val++;
+ ++val;
s.set(val, val);
}
@@ -483,10 +373,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
fut.get();
for (Ignite g : G.allGrids()) {
- IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, true).get();
+ IgniteBiTuple<Integer, Integer> t = g.atomicStamped(STRUCTURE_NAME, 1, 1, false).get();
- assert t.get1() == val;
- assert t.get2() == val;
+ assertEquals(val, (int)t.get1());
+ assertEquals(val, (int)t.get2());
}
}
}
@@ -499,16 +389,16 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
try {
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 20;
+ assertEquals(20, g.countDownLatch(STRUCTURE_NAME, 20, true, false).count());
- g.countDownLatch(STRUCTURE_NAME, 20, true, true).countDown(10);
+ g.countDownLatch(STRUCTURE_NAME, 20, true, false).countDown(10);
stopGrid(NEW_GRID_NAME);
- assert grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).count() == 10;
+ assertEquals(10, grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).count());
}
finally {
- grid(0).countDownLatch(STRUCTURE_NAME, 20, true, true).countDownAll();
+ grid(0).countDownLatch(STRUCTURE_NAME, 20, true, false).countDownAll();
}
}
}
@@ -517,6 +407,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreTopologyChange() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1977");
try (IgniteSemaphore semaphore = grid(0).semaphore(STRUCTURE_NAME, 20, true, true)) {
try {
@@ -541,6 +432,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreConstantTopologyChange() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1977");
+
try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, 10, false, true)) {
try {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@@ -595,6 +488,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreConstantTopologyChangeFailoverSafe() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1977");
+
try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true)) {
try {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@@ -656,6 +551,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreConstantMultipleTopologyChangeFailoverSafe() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1977");
+
final int numPermits = 3;
try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, numPermits, true, true)) {
@@ -728,6 +625,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testSemaphoreConstantTopologyChangeNotFailoverSafe() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1977");
+
try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, 1, false, true)) {
try {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@@ -788,105 +687,48 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testCountDownLatchConstantTopologyChange() throws Exception {
+ doTestCountDownLatch(new ConstantTopologyChangeWorker());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCountDownLatchConstantMultipleTopologyChange() throws Exception {
+ doTestCountDownLatch(multipleTopologyChangeWorker());
+ }
+
+ /**
+ * Tests distributed count down latch.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestCountDownLatch(ConstantTopologyChangeWorker topWorker) throws Exception {
try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) {
try {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
-
- try {
- Ignite g = startGrid(name);
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(
+ new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assert ignite.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count() > 0;
- assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
+ return null;
}
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.count();
while (!fut.isDone()) {
- assert s.count() == val;
-
- assert s.countDown() == val - 1;
-
- val--;
+ assertEquals(val, s.count());
+ assertEquals(--val, s.countDown());
}
fut.get();
for (Ignite g : G.allGrids())
- assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count() == val;
+ assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).count());
}
finally {
- grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true).countDownAll();
- }
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCountDownLatchConstantMultipleTopologyChange() throws Exception {
- try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) {
- try {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
-
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
-
- names.add(name);
-
- Ignite g = startGrid(name);
-
- assert g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false) != null;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
- }
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- int val = s.count();
-
- while (!fut.isDone()) {
- assert s.count() == val;
-
- assert s.countDown() == val - 1;
-
- val--;
- }
-
- fut.get();
-
- for (Ignite g : G.allGrids())
- assertEquals(val, g.countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).count());
- }
- finally {
- grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll();
+ grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, false).countDownAll();
}
}
}
@@ -900,13 +742,13 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).poll() == 10;
+ assertEquals(10, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).poll());
g.queue(STRUCTURE_NAME, 0, null).put(20);
stopGrid(NEW_GRID_NAME);
- assert grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek() == 20;
+ assertEquals(20, (int)grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).peek());
}
finally {
grid(0).<Integer>queue(STRUCTURE_NAME, 0, null).close();
@@ -917,107 +759,138 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testQueueConstantTopologyChange() throws Exception {
+ doTestQueue(new ConstantTopologyChangeWorker());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueueConstantMultipleTopologyChange() throws Exception {
+ doTestQueue(multipleTopologyChangeWorker());
+ }
+
+ /**
+ * Tests the queue.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestQueue(ConstantTopologyChangeWorker topWorker) throws Exception {
+ int queueMaxSize = 100;
+
try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
s.put(1);
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- String name = UUID.randomUUID().toString();
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ IgniteQueue<Integer> queue = ignite.queue(STRUCTURE_NAME, 0, null);
- try {
- Ignite g = startGrid(name);
+ assertNotNull(queue);
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ Integer val = queue.peek();
+
+ assertNotNull(val);
+
+ assert val > 0;
+
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
int val = s.peek();
- int origVal = val;
+ while (!fut.isDone()) {
+ if (s.size() == queueMaxSize) {
+ int last = 0;
+
+ for (int i = 0, size = s.size() - 1; i < size; i++) {
+ int cur = s.poll();
+
+ if (i == 0) {
+ last = cur;
+
+ continue;
+ }
+
+ assertEquals(last, cur - 1);
+
+ last = cur;
+ }
+ }
- while (!fut.isDone())
s.put(++val);
+ }
fut.get();
+ val = s.peek();
+
for (Ignite g : G.allGrids())
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal;
+ assertEquals(val, (int)g.<Integer>queue(STRUCTURE_NAME, 0, null).peek());
}
}
/**
* @throws Exception If failed.
*/
- public void testQueueConstantMultipleTopologyChange() throws Exception {
- try (IgniteQueue<Integer> s = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
- s.put(1);
+ public void testAtomicSequenceInitialization() throws Exception {
+ int threadCnt = 3;
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- Collection<String> names = new GridLeanSet<>(3);
+ final AtomicInteger idx = new AtomicInteger(gridCount());
- try {
- for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ int id = idx.getAndIncrement();
- names.add(name);
+ try {
+ startGrid(id);
- Ignite g = startGrid(name);
+ Thread.sleep(1000);
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() > 0;
- }
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- for (String name : names)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ catch (Exception e) {
+ throw F.wrap(e);
+ }
+ finally {
+ stopGrid(id);
- int val = s.peek();
+ info("Thread finished.");
+ }
+ }
+ }, threadCnt, "test-thread");
- int origVal = val;
+ while (!fut.isDone()) {
+ grid(0).compute().call(new IgniteCallable<Object>() {
+ /** */
+ @IgniteInstanceResource
+ private Ignite g;
- while (!fut.isDone())
- s.put(++val);
+ @Override public Object call() throws Exception {
+ IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
- fut.get();
+ assert seq != null;
- for (Ignite g : G.allGrids())
- assert g.<Integer>queue(STRUCTURE_NAME, 0, null).peek() == origVal;
+ for (int i = 0; i < 1000; i++)
+ seq.getAndIncrement();
+
+ return null;
+ }
+ });
}
+
+ fut.get();
}
/**
* @throws Exception If failed.
*/
public void testAtomicSequenceTopologyChange() throws Exception {
- try (IgniteAtomicSequence s = grid().atomicSequence(STRUCTURE_NAME, 10, true)) {
+ try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 10, true)) {
Ignite g = startGrid(NEW_GRID_NAME);
- assert g.atomicSequence(STRUCTURE_NAME, 10, false).get() == 1010;
+ assertEquals(1010, g.atomicSequence(STRUCTURE_NAME, 10, false).get());
- assert g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10) == 1020;
+ assertEquals(1020, g.atomicSequence(STRUCTURE_NAME, 10, false).addAndGet(10));
stopGrid(NEW_GRID_NAME);
}
@@ -1027,29 +900,31 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicSequenceConstantTopologyChange() throws Exception {
- try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- try {
- String name = UUID.randomUUID().toString();
+ doTestAtomicSequence(new ConstantTopologyChangeWorker());
+ }
- for (int i = 0; i < TOP_CHANGE_CNT; i++) {
- try {
- Ignite g = startGrid(name);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
+ doTestAtomicSequence(multipleTopologyChangeWorker());
+ }
- assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
- }
- finally {
- if (i != TOP_CHANGE_CNT - 1)
- stopGrid(name);
- }
- }
- }
- catch (Exception e) {
- throw F.wrap(e);
- }
+ /**
+ * Tests atomic sequence.
+ *
+ * @param topWorker Topology change worker.
+ * @throws Exception If failed.
+ */
+ private void doTestAtomicSequence(ConstantTopologyChangeWorker topWorker) throws Exception {
+ try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
+ IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ assertTrue(ignite.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
+
+ return null;
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+ });
long old = s.get();
@@ -1070,135 +945,228 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/**
* @throws Exception If failed.
*/
- public void testAtomicSequenceInitialization() throws Exception {
- int threadCnt = 3;
+ public void testUncommitedTxLeave() throws Exception {
+ final int val = 10;
- final AtomicInteger idx = new AtomicInteger(gridCount());
+ grid(0).atomicLong(STRUCTURE_NAME, val, true);
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
- @Override public void apply() {
- int id = idx.getAndIncrement();
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Ignite g = startGrid(NEW_GRID_NAME);
try {
- startGrid(id);
+ g.transactions().txStart();
- Thread.sleep(1000);
+ g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1);
- }
- catch (Exception e) {
- throw F.wrap(e);
+ assertEquals(val + 1, g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet());
}
finally {
- stopGrid(id);
-
- info("Thread finished.");
+ stopGrid(NEW_GRID_NAME);
}
+
+ return null;
}
- }, threadCnt, "test-thread");
+ }).get();
- while (!fut.isDone()) {
- grid(0).compute().call(new IgniteCallable<Object>() {
- /** */
- @IgniteInstanceResource
- private Ignite g;
+ waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()]));
- @Override public Object call() throws Exception {
- IgniteAtomicSequence seq = g.atomicSequence(STRUCTURE_NAME, 1, true);
+ assertEquals(val + 1, grid(0).atomicLong(STRUCTURE_NAME, val, false).get());
+ }
- assert seq != null;
+ /**
+ * @return Specific multiple topology change worker implementation.
+ */
+ private ConstantTopologyChangeWorker multipleTopologyChangeWorker() {
+ return collectionCacheMode() == CacheMode.PARTITIONED ? new PartitionedMultipleTopologyChangeWorker() :
+ new MultipleTopologyChangeWorker();
+ }
- for (int i = 0; i < 1000; i++)
- seq.getAndIncrement();
+ /**
+ *
+ */
+ private class ConstantTopologyChangeWorker {
+ /** */
+ protected final AtomicBoolean failed = new AtomicBoolean(false);
+
+ /**
+ * Starts changing cluster's topology.
+ *
+ * @return Future.
+ */
+ IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ try {
+ for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+ if (failed.get())
+ return;
- return null;
+ String name = UUID.randomUUID().toString();
+
+ try {
+ Ignite g = startGrid(name);
+
+ callback.apply(g);
+ }
+ finally {
+ if (i != TOP_CHANGE_CNT - 1)
+ stopGrid(name);
+ }
+ }
}
- });
- }
+ catch (Exception e) {
+ if (failed.compareAndSet(false, true))
+ throw F.wrap(e);
+ }
+ }
+ }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
- fut.get();
+ return fut;
+ }
}
/**
- * @throws Exception If failed.
+ *
*/
- public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
- try (IgniteAtomicSequence s = grid(0).atomicSequence(STRUCTURE_NAME, 1, true)) {
+ private class MultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
+ /**
+ * Starts changing cluster's topology.
+ *
+ * @return Future.
+ */
+ @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
@Override public void apply() {
try {
for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+ if (failed.get())
+ return;
+
Collection<String> names = new GridLeanSet<>(3);
try {
for (int j = 0; j < 3; j++) {
- String name = UUID.randomUUID().toString();
+ if (failed.get())
+ return;
- names.add(name);
+ String name = UUID.randomUUID().toString();
Ignite g = startGrid(name);
- assertTrue(g.atomicSequence(STRUCTURE_NAME, 1, false).get() > 0);
+ names.add(name);
+
+ callback.apply(g);
}
}
finally {
- if (i != TOP_CHANGE_CNT - 1)
+ if (i != TOP_CHANGE_CNT - 1) {
for (String name : names)
stopGrid(name);
+ }
}
}
}
catch (Exception e) {
- throw F.wrap(e);
+ if (failed.compareAndSet(false, true))
+ throw F.wrap(e);
}
}
}, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
- long old = s.get();
-
- while (!fut.isDone()) {
- assertEquals(old, s.get());
-
- long val = s.incrementAndGet();
-
- assertTrue(val > old);
-
- old = val;
- }
-
- fut.get();
+ return fut;
}
}
/**
- * @throws Exception If failed.
+ *
*/
- public void testUncommitedTxLeave() throws Exception {
- final int val = 10;
+ private class PartitionedMultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
+ /** */
+ private CyclicBarrier barrier;
+
+ /**
+ * Starts changing cluster's topology.
+ *
+ * @return Future.
+ */
+ @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
+ final Semaphore sem = new Semaphore(TOP_CHANGE_THREAD_CNT);
+
+ final ConcurrentSkipListSet<String> startedNodes = new ConcurrentSkipListSet<>();
+
+ barrier = new CyclicBarrier(TOP_CHANGE_THREAD_CNT, new Runnable() {
+ @Override public void run() {
+ try {
+ assertEquals(TOP_CHANGE_THREAD_CNT * 3, startedNodes.size());
- grid(0).atomicLong(STRUCTURE_NAME, val, true);
+ for (String name : startedNodes) {
+ stopGrid(name, false);
- GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Ignite g = startGrid(NEW_GRID_NAME);
+ awaitPartitionMapExchange();
+ }
- try {
- g.transactions().txStart();
+ startedNodes.clear();
+ sem.release(TOP_CHANGE_THREAD_CNT);
- g.cache(TRANSACTIONAL_CACHE_NAME).put(1, 1);
+ barrier.reset();
+ }
+ catch (Exception e) {
+ if (failed.compareAndSet(false, true)) {
+ sem.release(TOP_CHANGE_THREAD_CNT);
- assert g.atomicLong(STRUCTURE_NAME, val, false).incrementAndGet() == val + 1;
- }
- finally {
- stopGrid(NEW_GRID_NAME);
+ barrier.reset();
+
+ throw F.wrap(e);
+ }
+ }
}
+ });
- return null;
- }
- }).get();
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ try {
+ for (int i = 0; i < TOP_CHANGE_CNT; i++) {
+ sem.acquire();
- waitForDiscovery(G.allGrids().toArray(new Ignite[gridCount()]));
+ if (failed.get())
+ return;
+
+ for (int j = 0; j < 3; j++) {
+ if (failed.get())
+ return;
+
+ String name = UUID.randomUUID().toString();
+
+ startedNodes.add(name);
- assert grid(0).atomicLong(STRUCTURE_NAME, val, false).get() == val + 1;
+ Ignite g = startGrid(name);
+
+ callback.apply(g);
+ }
+
+ try {
+ barrier.await();
+ }
+ catch (BrokenBarrierException e) {
+ // Ignore.
+ }
+ }
+ }
+ catch (Exception e) {
+ if (failed.compareAndSet(false, true)) {
+ sem.release(TOP_CHANGE_THREAD_CNT);
+
+ barrier.reset();
+
+ throw F.wrap(e);
+ }
+ }
+ }
+ }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
+
+ return fut;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
index 18b0b21..6c880a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedDataStructuresFailoverSelfTest.java
@@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
public class GridCachePartitionedDataStructuresFailoverSelfTest
extends GridCacheAbstractDataStructuresFailoverSelfTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-803");
- }
-
- /** {@inheritDoc} */
@Override protected CacheMode collectionCacheMode() {
return PARTITIONED;
}
@@ -50,4 +45,4 @@ public class GridCachePartitionedDataStructuresFailoverSelfTest
@Override protected CacheAtomicityMode collectionCacheAtomicityMode() {
return TRANSACTIONAL;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
index 86b763a..b3ded7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedOffheapDataStructuresFailoverSelfTest.java
@@ -24,16 +24,10 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
/**
* Failover tests for cache data structures.
*/
-public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest extends GridCachePartitionedDataStructuresFailoverSelfTest {
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-803");
- }
-
+public class GridCachePartitionedOffheapDataStructuresFailoverSelfTest
+ extends GridCachePartitionedDataStructuresFailoverSelfTest {
/** {@inheritDoc} */
@Override protected CacheMemoryMode collectionMemoryMode() {
return OFFHEAP_TIERED;
}
-
-
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
index d0131d6..28ce901 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java
@@ -32,11 +32,6 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
public class GridCacheReplicatedDataStructuresFailoverSelfTest
extends GridCacheAbstractDataStructuresFailoverSelfTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-801");
- }
-
- /** {@inheritDoc} */
@Override protected CacheMode collectionCacheMode() {
return REPLICATED;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
index 19daa26..c00557d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -150,6 +150,8 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract
* @throws Exception If failed.
*/
public void testClientQueueCreateCloseFailover() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-1976");
+
testFailoverWithClient(new IgniteInClosure<Ignite>() {
@Override public void apply(Ignite ignite) {
for (int i = 0; i < 100; i++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index caca2ca..94dc665 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -180,7 +180,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
* @return Timeout.
*/
protected long awaitForSocketWriteTimeout() {
- return 5000;
+ return 8000;
}
/**
@@ -742,4 +742,4 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
nodes.clear();
spiRsrcs.clear();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 344efc0..6b20b2a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -273,7 +273,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
}
try {
- assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS));
+ assertTrue(latch.await(failureThreshold + 3000, TimeUnit.MILLISECONDS));
assertFalse("Unexpected event, see log for details.", err.get());
assertEquals(nodeId, client.cluster().localNode().id());
@@ -331,4 +331,4 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
err = null;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 379a3a6..42960e7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -373,6 +373,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
private void testFailureDetectionOnNodePing(Ignite pingingNode, Ignite failedNode) throws Exception {
final CountDownLatch cnt = new CountDownLatch(1);
+ final UUID failedNodeId = failedNode.cluster().localNode().id();
+
pingingNode.events().localListen(
new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
@@ -390,9 +392,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
TcpDiscoverySpi spi = discoMap.get(pingingNode.name());
- boolean res = spi.pingNode(failedNode.cluster().localNode().id());
+ boolean res = spi.pingNode(failedNodeId);
- assertFalse("Ping is ok for node " + failedNode.cluster().localNode().id() + ", but had to fail.", res);
+ assertFalse("Ping is ok for node " + failedNodeId + ", but had to fail.", res);
// Heartbeat interval is 40 seconds, but we should detect node failure faster.
assert cnt.await(7, SECONDS);
@@ -409,6 +411,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse = true;
+ final UUID failedNodeId = failedNode.cluster().localNode().id();
+
final CountDownLatch pingLatch = new CountDownLatch(1);
final CountDownLatch eventLatch = new CountDownLatch(1);
@@ -422,7 +426,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
pingingNode.events().localListen(
new IgnitePredicate<Event>() {
@Override public boolean apply(Event event) {
- if (((DiscoveryEvent)event).eventNode().id().equals(failedNode.cluster().localNode().id())) {
+ if (((DiscoveryEvent)event).eventNode().id().equals(failedNodeId)) {
failRes.set(true);
eventLatch.countDown();
}
@@ -438,7 +442,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
pingLatch.countDown();
pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode(
- failedNode.cluster().localNode().id()));
+ failedNodeId));
return null;
}
@@ -1166,7 +1170,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
for (IgniteKernal grid : grids)
assertEquals(startTime, (Long)grid.context().discovery().gridStartTime());
- grids.add((IgniteKernal) startGrid(5));
+ grids.add((IgniteKernal)startGrid(5));
for (IgniteKernal grid : grids)
assertEquals(startTime, (Long)grid.context().discovery().gridStartTime());
@@ -1326,6 +1330,61 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed
+ */
+ public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception {
+ try {
+ TestMessageWorkerFailureSpi spi0 = new TestMessageWorkerFailureSpi();
+
+ nodeSpi.set(spi0);
+
+ final Ignite ignite0 = startGrid(0);
+
+ nodeSpi.set(new TcpDiscoverySpi());
+
+ Ignite ignite1 = startGrid(1);
+
+ final AtomicBoolean disconnected = new AtomicBoolean();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final UUID failedNodeId = ignite0.cluster().localNode().id();
+
+ ignite1.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event event) {
+ if (event.type() == EventType.EVT_NODE_FAILED &&
+ failedNodeId.equals(((DiscoveryEvent)event).eventNode().id()))
+ disconnected.set(true);
+
+ latch.countDown();
+
+ return false;
+ }
+ }, EventType.EVT_NODE_FAILED);
+
+ spi0.stop = true;
+
+ latch.await(15, TimeUnit.SECONDS);
+
+ assertTrue(disconnected.get());
+
+ try {
+ ignite0.cluster().localNode().id();
+ }
+ catch (IllegalStateException e) {
+ if (e.getMessage().contains("Grid is in invalid state to perform this operation"))
+ return;
+ }
+
+ fail();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+
+ /**
* @param twoNodes If {@code true} starts two nodes, otherwise three.
* @throws Exception If failed
*/
@@ -1891,6 +1950,25 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ private static class TestMessageWorkerFailureSpi extends TcpDiscoverySpi {
+ /** */
+ private volatile boolean stop;
+
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+
+ if (stop)
+ throw new RuntimeException("Failing ring message worker explicitly");
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+ }
+
+ /**
* Starts new grid with given index. Method optimize is not invoked.
*
* @param idx Index of the grid to start.
@@ -1911,4 +1989,4 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
private Ignite startGridNoOptimize(String gridName) throws Exception {
return G.start(getConfiguration(gridName));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 6f9c559..1fd4cb1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -856,7 +856,7 @@ public abstract class GridAbstractTest extends TestCase {
List<Ignite> ignites = G.allGrids();
for (Ignite g : ignites) {
- if (g.cluster().localNode().isClient())
+ if (g.configuration().getDiscoverySpi().isClientMode())
stopGrid(g.name(), cancel);
}
}
@@ -868,7 +868,7 @@ public abstract class GridAbstractTest extends TestCase {
List<Ignite> ignites = G.allGrids();
for (Ignite g : ignites) {
- if (!g.cluster().localNode().isClient())
+ if (!g.configuration().getDiscoverySpi().isClientMode())
stopGrid(g.name(), cancel);
}
}
@@ -2065,4 +2065,4 @@ public abstract class GridAbstractTest extends TestCase {
*/
public abstract void run(Ignite ignite, IgniteCache<K, V> cache) throws Exception;
}
-}
\ No newline at end of file
+}
[2/3] ignite git commit: ignite-801 and ignite-1911: resurrecting
data structure and atomics failover tests + stopping the node if ring message
worker fails
Posted by dm...@apache.org.
ignite-801 and ignite-1911: resurrecting data structure and atomics failover tests + stopping the node if ring message worker fails
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c711484c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c711484c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c711484c
Branch: refs/heads/ignite-1.5
Commit: c711484c30315c06ce0b31a8775bfc41b7ee1483
Parents: 8e7e330
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Nov 20 19:11:07 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Nov 20 19:11:07 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheUtils.java | 39 +-
.../CacheDataStructuresManager.java | 2 +-
.../GridFutureRemapTimeoutObject.java | 72 --
.../dht/GridPartitionedGetFuture.java | 28 +-
.../distributed/near/GridNearGetFuture.java | 28 +-
.../IgniteTxImplicitSingleStateImpl.java | 29 +-
.../IgniteTxRemoteSingleStateImpl.java | 19 +-
.../datastructures/DataStructuresProcessor.java | 47 +-
.../GridAtomicCacheQueueImpl.java | 126 +--
.../GridCacheAtomicReferenceImpl.java | 10 +-
.../GridCacheCountDownLatchImpl.java | 15 +-
.../datastructures/GridCacheQueueAdapter.java | 32 +-
.../GridTransactionalCacheQueueImpl.java | 193 ++--
.../ignite/spi/discovery/DiscoverySpi.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 36 +
...eAbstractDataStructuresFailoverSelfTest.java | 924 +++++++++----------
...rtitionedDataStructuresFailoverSelfTest.java | 7 +-
...edOffheapDataStructuresFailoverSelfTest.java | 12 +-
...eplicatedDataStructuresFailoverSelfTest.java | 5 -
...gniteAtomicLongChangingTopologySelfTest.java | 2 +
...GridTcpCommunicationSpiRecoverySelfTest.java | 4 +-
...lientDiscoverySpiFailureTimeoutSelfTest.java | 4 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 90 +-
.../testframework/junits/GridAbstractTest.java | 6 +-
24 files changed, 796 insertions(+), 936 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index f7d115f..89779d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -93,6 +94,7 @@ import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -1780,28 +1782,41 @@ public class GridCacheUtils {
public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) {
return new Callable<S>() {
@Override public S call() throws Exception {
- int retries = GridCacheAdapter.MAX_RETRIES;
-
IgniteCheckedException err = null;
- for (int i = 0; i < retries; i++) {
+ for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
try {
return c.call();
}
+ catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) {
+ throw e;
+ }
+ catch (TransactionRollbackException e) {
+ if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+ throw e;
+
+ U.sleep(1);
+ }
catch (IgniteCheckedException e) {
- if (X.hasCause(e, ClusterTopologyCheckedException.class) ||
- X.hasCause(e, IgniteTxRollbackCheckedException.class) ||
- X.hasCause(e, CachePartialUpdateCheckedException.class)) {
- if (i < retries - 1) {
- err = e;
+ if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+ throw e;
- U.sleep(1);
+ if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
+ ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
- continue;
- }
+ if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof
+ ClusterTopologyServerNotFoundException)
+ throw e;
- throw e;
+ // IGNITE-1948: remove this check when the issue is fixed
+ if (topErr.retryReadyFuture() != null)
+ topErr.retryReadyFuture().get();
+ else
+ U.sleep(1);
}
+ else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
+ CachePartialUpdateCheckedException.class))
+ U.sleep(1);
else
throw e;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 1ff4575..930921b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -770,4 +770,4 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
return "RemoveSetCallable [setId=" + setId + ']';
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
deleted file mode 100644
index 72fdd4b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-
-/**
- * Future remap timeout object.
- */
-public class GridFutureRemapTimeoutObject extends GridTimeoutObjectAdapter {
- /** */
- private final GridFutureAdapter<?> fut;
-
- /** Finished flag. */
- private final AtomicBoolean finished = new AtomicBoolean();
-
- /** Topology version to wait. */
- private final AffinityTopologyVersion topVer;
-
- /** Exception cause. */
- private final IgniteCheckedException e;
-
- /**
- * @param fut Future.
- * @param timeout Timeout.
- * @param topVer Topology version timeout was created on.
- * @param e Exception cause.
- */
- public GridFutureRemapTimeoutObject(
- GridFutureAdapter<?> fut,
- long timeout,
- AffinityTopologyVersion topVer,
- IgniteCheckedException e) {
- super(timeout);
-
- this.fut = fut;
- this.topVer = topVer;
- this.e = e;
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- if (finish()) // Fail the whole get future, else remap happened concurrently.
- fut.onDone(new IgniteCheckedException("Failed to wait for topology version to change: " + topVer, e));
- }
-
- /**
- * @return Guard against concurrent completion.
- */
- public boolean finish() {
- return finished.compareAndSet(false, true);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index c3d9836..e3fae22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -644,34 +643,23 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
final AffinityTopologyVersion updTopVer =
new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
- final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
- cctx.kernalContext().config().getNetworkTimeout(),
- updTopVer,
- e);
-
cctx.affinity().affinityReadyFuture(updTopVer).listen(
new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- if (timeout.finish()) {
- cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
- try {
- fut.get();
+ try {
+ fut.get();
- // Remap.
- map(keys.keySet(), F.t(node, keys), updTopVer);
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), updTopVer);
- onDone(Collections.<K, V>emptyMap());
- }
- catch (IgniteCheckedException e) {
- GridPartitionedGetFuture.this.onDone(e);
- }
+ onDone(Collections.<K, V>emptyMap());
+ }
+ catch (IgniteCheckedException e) {
+ GridPartitionedGetFuture.this.onDone(e);
}
}
}
);
-
- cctx.kernalContext().timeout().addTimeoutObject(timeout);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index dfaa44e..f1bff61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -851,34 +850,23 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
final AffinityTopologyVersion updTopVer =
new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
- final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
- cctx.kernalContext().config().getNetworkTimeout(),
- updTopVer,
- e);
-
cctx.affinity().affinityReadyFuture(updTopVer).listen(
new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- if (timeout.finish()) {
- cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
- try {
- fut.get();
+ try {
+ fut.get();
- // Remap.
- map(keys.keySet(), F.t(node, keys), updTopVer);
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), updTopVer);
- onDone(Collections.<K, V>emptyMap());
- }
- catch (IgniteCheckedException e) {
- GridNearGetFuture.this.onDone(e);
- }
+ onDone(Collections.<K, V>emptyMap());
+ }
+ catch (IgniteCheckedException e) {
+ GridNearGetFuture.this.onDone(e);
}
}
}
);
-
- cctx.kernalContext().timeout().addTimeoutObject(timeout);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index c75a8f38..3e0231e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.cache.transactions;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
@@ -28,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -160,8 +163,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
CacheStoreManager store = cacheCtx.store();
- if (store.configured())
- return Collections.singleton(store);
+ if (store.configured()) {
+ HashSet<CacheStoreManager> set = new HashSet<>(3, 0.75f);
+
+ set.add(store);
+
+ return set;
+ }
return null;
}
@@ -192,12 +200,20 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public Set<IgniteTxKey> writeSet() {
- return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet();
+ if (entry != null) {
+ HashSet<IgniteTxKey> set = new HashSet<>(3, 0.75f);
+
+ set.add(entry.txKey());
+
+ return set;
+ }
+ else
+ return Collections.<IgniteTxKey>emptySet();
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> writeEntries() {
- return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
}
/** {@inheritDoc} */
@@ -207,8 +223,7 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
- return entry != null ? Collections.singletonMap(entry.txKey(), entry) :
- Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
+ return entry != null ? F.asMap(entry.txKey(), entry) : Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
}
/** {@inheritDoc} */
@@ -223,7 +238,7 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> allEntries() {
- return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
index 22f04a8..90af517 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
@@ -17,10 +17,13 @@
package org.apache.ignite.internal.processors.cache.transactions;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -62,12 +65,20 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
/** {@inheritDoc} */
@Override public Set<IgniteTxKey> writeSet() {
- return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet();
+ if (entry != null) {
+ HashSet<IgniteTxKey> set = new HashSet<>(3, 0.75f);
+
+ set.add(entry.txKey());
+
+ return set;
+ }
+ else
+ return Collections.<IgniteTxKey>emptySet();
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> writeEntries() {
- return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
}
/** {@inheritDoc} */
@@ -77,7 +88,7 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
/** {@inheritDoc} */
@Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
- return entry != null ? Collections.singletonMap(entry.txKey(), entry) :
+ return entry != null ? F.asMap(entry.txKey(), entry) :
Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
}
@@ -93,7 +104,7 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> allEntries() {
- return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index b532d7f..23d64cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -56,14 +56,13 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -532,21 +531,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (dataStructure != null)
return dataStructure;
- if (!create)
- return c.applyx();
-
while (true) {
- try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+ try {
+ if (!create)
+ return c.applyx();
- if (err != null)
- throw err;
+ try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
- dataStructure = c.applyx();
+ if (err != null)
+ throw err;
- tx.commit();
+ dataStructure = c.applyx();
+
+ tx.commit();
- return dataStructure;
+ return dataStructure;
+ }
}
catch (IgniteTxRollbackCheckedException ignore) {
// Safe to retry right away.
@@ -1605,27 +1606,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
*/
public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
try {
- int cnt = 0;
-
- while (true) {
- try {
- return call.call();
- }
- catch (ClusterGroupEmptyCheckedException e) {
- throw new IgniteCheckedException(e);
- }
- catch (IgniteTxRollbackCheckedException |
- CachePartialUpdateCheckedException |
- ClusterTopologyCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to execute data structure operation, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ return GridCacheUtils.retryTopologySafe(call).call();
}
catch (IgniteCheckedException e) {
throw e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
index 28f8631..b433887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
@@ -23,7 +23,6 @@ import java.util.Map;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -56,26 +55,9 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
checkRemoved(idx);
- int cnt = 0;
-
GridCacheQueueItemKey key = itemKey(idx);
- while (true) {
- try {
- cache.getAndPut(key, item);
-
- break;
- }
- catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to put queue item, will retry [err=" + e + ", idx=" + idx + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ cache.getAndPut(key, item);
return true;
}
@@ -98,38 +80,18 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
GridCacheQueueItemKey key = itemKey(idx);
- int cnt = 0;
-
- long stop = 0;
+ T data = (T)cache.getAndRemove(key);
- while (true) {
- try {
- T data = (T)cache.getAndRemove(key);
+ if (data != null)
+ return data;
- if (data != null)
- return data;
+ long stop = U.currentTimeMillis() + RETRY_TIMEOUT;
- if (stop == 0)
- stop = U.currentTimeMillis() + RETRY_TIMEOUT;
+ while (U.currentTimeMillis() < stop) {
+ data = (T)cache.getAndRemove(key);
- while (U.currentTimeMillis() < stop ) {
- data = (T)cache.getAndRemove(key);
-
- if (data != null)
- return data;
- }
-
- break;
- }
- catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to remove queue item, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
+ if (data != null)
+ return data;
}
U.warn(log, "Failed to get item, will retry poll [queue=" + queueName + ", idx=" + idx + ']');
@@ -161,24 +123,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
idx++;
}
- int cnt = 0;
-
- while (true) {
- try {
- cache.putAll(putMap);
-
- break;
- }
- catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add items, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ cache.putAll(putMap);
return true;
}
@@ -197,34 +142,14 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
GridCacheQueueItemKey key = itemKey(idx);
- int cnt = 0;
+ if (cache.remove(key))
+ return;
- long stop = 0;
-
- while (true) {
- try {
- if (cache.remove(key))
- return;
+ long stop = U.currentTimeMillis() + RETRY_TIMEOUT;
- if (stop == 0)
- stop = U.currentTimeMillis() + RETRY_TIMEOUT;
-
- while (U.currentTimeMillis() < stop ) {
- if (cache.remove(key))
- return;
- }
-
- break;
- }
- catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add items, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
+ while (U.currentTimeMillis() < stop) {
+ if (cache.remove(key))
+ return;
}
U.warn(log, "Failed to remove item, [queue=" + queueName + ", idx=" + idx + ']');
@@ -239,21 +164,6 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
@SuppressWarnings("unchecked")
@Nullable private Long transformHeader(EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long> c)
throws IgniteCheckedException {
- int cnt = 0;
-
- while (true) {
- try {
- return (Long)cache.invoke(queueKey, c).get();
- }
- catch (CachePartialUpdateCheckedException e) {
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to update queue header, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ return (Long)cache.invoke(queueKey, c).get();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index c0c38b2..37cdaea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
/**
* Cache atomic reference implementation.
@@ -230,7 +231,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
* @return Callable for execution in async and sync mode.
*/
private Callable<Boolean> internalSet(final T val) {
- return new Callable<Boolean>() {
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
@@ -252,7 +253,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
throw e;
}
}
- };
+ });
}
/**
@@ -265,7 +266,8 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
*/
private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred,
final IgniteClosure<T, T> newValClos) {
- return new Callable<Boolean>() {
+
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
@@ -295,7 +297,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
throw e;
}
}
- };
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 2667938..c984ab3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -342,20 +342,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
private class GetCountCallable implements Callable<Integer> {
/** {@inheritDoc} */
@Override public Integer call() throws Exception {
- Integer val;
+ GridCacheCountDownLatchValue latchVal = latchView.get(key);
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheCountDownLatchValue latchVal = latchView.get(key);
-
- if (latchVal == null)
- return 0;
-
- val = latchVal.get();
-
- tx.rollback();
- }
-
- return val;
+ return latchVal == null ? 0 : latchVal.get();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 0e4aebc..df1bd88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -58,9 +58,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE;
/** */
- protected static final int MAX_UPDATE_RETRIES = 100;
-
- /** */
protected static final long RETRY_DELAY = 1;
/** */
@@ -169,14 +166,22 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@SuppressWarnings("unchecked")
@Nullable @Override public T peek() throws IgniteException {
try {
- GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey);
+ while (true) {
+ GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey);
- checkRemoved(hdr);
+ checkRemoved(hdr);
- if (hdr.empty())
- return null;
+ if (hdr.empty())
+ return null;
- return (T)cache.get(itemKey(hdr.head()));
+ T val = (T)cache.get(itemKey(hdr.head()));
+
+ if (val == null)
+ // Header might have been polled. Retry.
+ continue;
+
+ return val;
+ }
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -416,8 +421,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
long startIdx,
long endIdx,
int batchSize)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10);
for (long idx = startIdx; idx < endIdx; idx++) {
@@ -435,8 +439,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
}
/**
- * Checks result of closure modifying queue header, throws {@link IllegalStateException}
- * if queue was removed.
+ * Checks result of closure modifying queue header, throws {@link IllegalStateException} if queue was removed.
*
* @param idx Result of closure execution.
*/
@@ -529,7 +532,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
*/
protected abstract void removeItem(long rmvIdx) throws IgniteCheckedException;
-
/**
* @param idx Item index.
* @return Item key.
@@ -1036,7 +1038,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
if (o == null || getClass() != o.getClass())
return false;
- GridCacheQueueAdapter that = (GridCacheQueueAdapter) o;
+ GridCacheQueueAdapter that = (GridCacheQueueAdapter)o;
return id.equals(that.id);
@@ -1051,4 +1053,4 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@Override public String toString() {
return S.toString(GridCacheQueueAdapter.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index c7750a6..4880324 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -20,19 +20,17 @@ package org.apache.ignite.internal.processors.datastructures;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -55,12 +53,10 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
A.notNull(item, "item");
try {
- boolean retVal;
+ return retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ boolean retVal;
- int cnt = 0;
-
- while (true) {
- try {
try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get();
@@ -76,75 +72,59 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
tx.commit();
- break;
+ return retVal;
}
}
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
-
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
-
- return retVal;
+ }).call();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Nullable @Override public T poll() throws IgniteException {
try {
- int cnt = 0;
-
- T retVal;
-
- while (true) {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
+ return retryTopologySafe(new Callable<T>() {
+ @Override public T call() throws Exception {
+ T retVal;
- if (idx != null) {
- checkRemoved(idx);
-
- retVal = (T)cache.getAndRemove(itemKey(idx));
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
- assert retVal != null : idx;
- }
- else
- retVal = null;
+ if (idx != null) {
+ checkRemoved(idx);
- tx.commit();
+ retVal = (T)cache.getAndRemove(itemKey(idx));
- break;
- }
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
+ assert retVal != null : idx;
+ }
+ else
+ retVal = null;
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to poll item, will retry [err=" + e + ']');
+ tx.commit();
- U.sleep(RETRY_DELAY);
+ return retVal;
}
}
- }
-
- return retVal;
+ }).call();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
}
/** {@inheritDoc} */
@@ -153,95 +133,78 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
A.notNull(items, "items");
try {
- boolean retVal;
-
- int cnt = 0;
+ return retryTopologySafe(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ boolean retVal;
- while (true) {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
-
- if (idx != null) {
- checkRemoved(idx);
-
- Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
- for (T item : items) {
- putMap.put(itemKey(idx), item);
+ if (idx != null) {
+ checkRemoved(idx);
- idx++;
- }
+ Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
- cache.putAll(putMap);
+ for (T item : items) {
+ putMap.put(itemKey(idx), item);
- retVal = true;
- }
- else
- retVal = false;
+ idx++;
+ }
- tx.commit();
+ cache.putAll(putMap);
- break;
- }
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
+ retVal = true;
+ }
+ else
+ retVal = false;
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add item, will retry [err=" + e + ']');
+ tx.commit();
- U.sleep(RETRY_DELAY);
+ return retVal;
}
}
- }
-
- return retVal;
+ }).call();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected void removeItem(final long rmvIdx) throws IgniteCheckedException {
try {
- int cnt = 0;
+ retryTopologySafe(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
- while (true) {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
+ if (idx != null) {
+ checkRemoved(idx);
- if (idx != null) {
- checkRemoved(idx);
+ boolean rmv = cache.remove(itemKey(idx));
- boolean rmv = cache.remove(itemKey(idx));
+ assert rmv : idx;
+ }
- assert rmv : idx;
+ tx.commit();
}
- tx.commit();
-
- break;
+ return null;
}
- catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
- if (e instanceof ClusterGroupEmptyCheckedException)
- throw e;
-
- if (cnt++ == MAX_UPDATE_RETRIES)
- throw e;
- else {
- U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
- U.sleep(RETRY_DELAY);
- }
- }
- }
+ }).call();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 612c1f1..1ea5014 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -164,4 +164,4 @@ public interface DiscoverySpi extends IgniteSpi {
* @throws IllegalStateException If discovery SPI has not started.
*/
public boolean isClientMode() throws IllegalStateException;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index ae23d0e..ae3c8cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLException;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -2152,6 +2153,41 @@ class ServerImpl extends TcpDiscoveryImpl {
initConnectionCheckFrequency();
}
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ try {
+ super.body();
+ }
+ catch (Throwable e) {
+ if (!spi.isNodeStopping0()) {
+ final Ignite ignite = spi.ignite();
+
+ if (ignite != null) {
+ U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " +
+ "Stopping the grid in order to prevent cluster wide instability.", e);
+
+ new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ spi.ignite().close();
+
+ U.log(log, "Stopped the grid successfully in response to TcpDiscoverySpi's " +
+ "message worker thread abnormal termination.");
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to stop the grid in response to TcpDiscoverySpi's " +
+ "message worker thread abnormal termination.", e);
+ }
+ }
+ }).start();
+ }
+ }
+
+ // Must be processed by IgniteSpiThread as well.
+ throw e;
+ }
+ }
+
/**
* Initializes connection check frequency. Used only when failure detection timeout is enabled.
*/
[3/3] ignite git commit: Merge branch 'ignite-1.5' of
https://git-wip-us.apache.org/repos/asf/ignite into ignite-1.5
Posted by dm...@apache.org.
Merge branch 'ignite-1.5' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-1.5
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f89347f0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f89347f0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f89347f0
Branch: refs/heads/ignite-1.5
Commit: f89347f0ea83a964dc3cd63cdcab3ab123fa510b
Parents: c711484 ab8298a
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Nov 20 19:29:18 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Nov 20 19:29:18 2015 +0300
----------------------------------------------------------------------
.../stream/camel/IgniteCamelStreamerTest.java | 16 +-
.../camel/IgniteCamelStreamerTestSuite.java | 1 -
.../internal/MarshallerContextAdapter.java | 4 +-
.../GridCachePartitionExchangeManager.java | 34 ++-
.../dht/GridClientPartitionTopology.java | 38 +--
.../distributed/dht/GridDhtLockFuture.java | 1 -
.../dht/GridDhtPartitionTopology.java | 12 +-
.../dht/GridDhtPartitionTopologyImpl.java | 45 +--
.../distributed/dht/GridDhtTxPrepareFuture.java | 1 -
.../dht/GridPartitionedGetFuture.java | 2 +-
.../colocated/GridDhtColocatedLockFuture.java | 1 -
.../dht/preloader/GridDhtPartitionFullMap.java | 41 ++-
.../dht/preloader/GridDhtPartitionMap.java | 148 +--------
.../dht/preloader/GridDhtPartitionMap2.java | 306 +++++++++++++++++++
.../GridDhtPartitionsExchangeFuture.java | 24 +-
.../GridDhtPartitionsSingleMessage.java | 6 +-
.../distributed/near/GridNearCacheAdapter.java | 1 -
.../distributed/near/GridNearGetFuture.java | 2 -
.../distributed/near/GridNearLockFuture.java | 2 -
...arOptimisticSerializableTxPrepareFuture.java | 3 -
.../near/GridNearOptimisticTxPrepareFuture.java | 1 -
.../near/GridNearTxFinishFuture.java | 4 -
.../distributed/near/GridNearTxRemote.java | 1 -
.../processors/cache/local/GridLocalCache.java | 1 -
.../cache/local/GridLocalLockFuture.java | 2 -
.../cache/transactions/IgniteInternalTx.java | 1 -
.../cache/transactions/IgniteTxAdapter.java | 1 -
.../transactions/IgniteTxLocalAdapter.java | 1 -
.../cache/transactions/IgniteTxManager.java | 1 -
.../cache/transactions/IgniteTxStateImpl.java | 1 -
.../ignite/internal/visor/cache/VisorCache.java | 6 +-
.../org/apache/ignite/stream/StreamAdapter.java | 1 -
.../resources/META-INF/classnames.properties | 1 +
.../dht/GridCacheDhtPreloadDelayedSelfTest.java | 12 +-
.../dht/GridCacheDhtPreloadSelfTest.java | 4 +-
.../distributed/dht/GridCacheDhtTestUtils.java | 8 +-
...cingDelayedPartitionMapExchangeSelfTest.java | 178 +++++++++++
.../testsuites/IgniteCacheTestSuite3.java | 2 +
.../h2/twostep/GridReduceQueryExecutor.java | 4 +-
.../cache/WaitMapExchangeFinishCallable.java | 4 +-
.../IgniteFailoverAbstractBenchmark.java | 4 +-
41 files changed, 661 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f89347f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f89347f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------