You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/12/10 04:22:12 UTC
[05/13] ignite git commit: ignite-1.5 Fixed asserts in
GridTransactionalCacheQueueImpl to take into account case when queue data was
really lost.
ignite-1.5 Fixed asserts in GridTransactionalCacheQueueImpl to take into account case when queue data was really lost.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/add61614
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/add61614
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/add61614
Branch: refs/heads/ignite-843-rc2
Commit: add6161451d3e8b263782e52c51a0e2b34daeb3f
Parents: 3016c0f
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 9 16:32:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 9 16:32:48 2015 +0300
----------------------------------------------------------------------
.../GridTransactionalCacheQueueImpl.java | 32 +++--
...eAbstractDataStructuresFailoverSelfTest.java | 138 +++++++++++++++----
2 files changed, 128 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/add61614/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 32e94d3..7b80765 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -96,22 +96,28 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
@Override public T call() throws Exception {
T retVal;
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
+ while (true) {
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
- if (idx != null) {
- checkRemoved(idx);
+ if (idx != null) {
+ checkRemoved(idx);
- retVal = (T)cache.getAndRemove(itemKey(idx));
+ retVal = (T)cache.getAndRemove(itemKey(idx));
- assert retVal != null : idx;
- }
- else
- retVal = null;
+ if (retVal == null) { // Possible if data was lost.
+ tx.commit();
- tx.commit();
+ continue;
+ }
+ }
+ else
+ retVal = null;
- return retVal;
+ tx.commit();
+
+ return retVal;
+ }
}
}
}).call();
@@ -188,9 +194,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
if (idx != null) {
checkRemoved(idx);
- boolean rmv = cache.remove(itemKey(idx));
-
- assert rmv : idx;
+ cache.remove(itemKey(idx));
}
tx.commit();
http://git-wip-us.apache.org/repos/asf/ignite/blob/add61614/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index cb16aeb..1e15c15 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -196,14 +196,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicLongConstantTopologyChange() throws Exception {
- doTestAtomicLong(new ConstantTopologyChangeWorker());
+ doTestAtomicLong(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
/**
* @throws Exception If failed.
*/
public void testAtomicLongConstantMultipleTopologyChange() throws Exception {
- doTestAtomicLong(multipleTopologyChangeWorker());
+ doTestAtomicLong(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
/**
@@ -258,14 +258,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicReferenceConstantTopologyChange() throws Exception {
- doTestAtomicReference(new ConstantTopologyChangeWorker());
+ doTestAtomicReference(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
/**
* @throws Exception If failed.
*/
public void testAtomicReferenceConstantMultipleTopologyChange() throws Exception {
- doTestAtomicReference(multipleTopologyChangeWorker());
+ doTestAtomicReference(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
/**
@@ -326,14 +326,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicStampedConstantTopologyChange() throws Exception {
- doTestAtomicStamped(new ConstantTopologyChangeWorker());
+ doTestAtomicStamped(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
/**
* @throws Exception If failed.
*/
public void testAtomicStampedConstantMultipleTopologyChange() throws Exception {
- doTestAtomicStamped(multipleTopologyChangeWorker());
+ doTestAtomicStamped(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
/**
@@ -687,14 +687,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testCountDownLatchConstantTopologyChange() throws Exception {
- doTestCountDownLatch(new ConstantTopologyChangeWorker());
+ doTestCountDownLatch(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
/**
* @throws Exception If failed.
*/
public void testCountDownLatchConstantMultipleTopologyChange() throws Exception {
- doTestCountDownLatch(multipleTopologyChangeWorker());
+ doTestCountDownLatch(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
/**
@@ -758,15 +758,73 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/**
* @throws Exception If failed.
*/
+ public void testQueueTopologyChange() throws Exception {
+ ConstantTopologyChangeWorker topWorker = new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT);
+
+ try (final IgniteQueue<Integer> q = grid(0).queue(STRUCTURE_NAME, 0, config(false))) {
+ for (int i = 0; i < 1000; i++)
+ q.add(i);
+
+ final IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+ @Override public Object apply(Ignite ignite) {
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<?> takeFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (!fut.isDone())
+ q.take();
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<?> pollFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (!fut.isDone())
+ q.poll();
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<?> addFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (!fut.isDone())
+ q.add(0);
+
+ return null;
+ }
+ });
+
+ fut.get();
+
+ pollFut.get();
+ addFut.get();
+
+ q.add(0);
+
+ takeFut.get();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testQueueConstantTopologyChange() throws Exception {
- doTestQueue(new ConstantTopologyChangeWorker());
+ int topChangeThreads = collectionCacheMode() == CacheMode.PARTITIONED ? 1 : TOP_CHANGE_THREAD_CNT;
+
+ doTestQueue(new ConstantTopologyChangeWorker(topChangeThreads));
}
/**
* @throws Exception If failed.
*/
public void testQueueConstantMultipleTopologyChange() throws Exception {
- doTestQueue(multipleTopologyChangeWorker());
+ int topChangeThreads = collectionCacheMode() == CacheMode.PARTITIONED ? 1 : TOP_CHANGE_THREAD_CNT;
+
+ doTestQueue(multipleTopologyChangeWorker(topChangeThreads));
}
/**
@@ -902,14 +960,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
* @throws Exception If failed.
*/
public void testAtomicSequenceConstantTopologyChange() throws Exception {
- doTestAtomicSequence(new ConstantTopologyChangeWorker());
+ doTestAtomicSequence(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
/**
* @throws Exception If failed.
*/
public void testAtomicSequenceConstantMultipleTopologyChange() throws Exception {
- doTestAtomicSequence(multipleTopologyChangeWorker());
+ doTestAtomicSequence(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
}
/**
@@ -977,11 +1035,14 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
}
/**
+ * @param topChangeThreads Number of topology change threads.
+ *
* @return Specific multiple topology change worker implementation.
*/
- private ConstantTopologyChangeWorker multipleTopologyChangeWorker() {
- return collectionCacheMode() == CacheMode.PARTITIONED ? new PartitionedMultipleTopologyChangeWorker() :
- new MultipleTopologyChangeWorker();
+ private ConstantTopologyChangeWorker multipleTopologyChangeWorker(int topChangeThreads) {
+ return collectionCacheMode() == CacheMode.PARTITIONED ?
+ new PartitionedMultipleTopologyChangeWorker(topChangeThreads) :
+ new MultipleTopologyChangeWorker(topChangeThreads);
}
/**
@@ -991,13 +1052,24 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
/** */
protected final AtomicBoolean failed = new AtomicBoolean(false);
+ /** */
+ private final int topChangeThreads;
+
+ /**
+ * @param topChangeThreads Number of topology change threads.
+ */
+ public ConstantTopologyChangeWorker(int topChangeThreads) {
+ this.topChangeThreads = topChangeThreads;
+ }
+
/**
* Starts changing cluster's topology.
*
+ * @param cb Callback to run after node start.
* @return Future.
*/
- IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> cb) {
+ return GridTestUtils.runMultiThreadedAsync(new CA() {
@Override public void apply() {
try {
for (int i = 0; i < TOP_CHANGE_CNT; i++) {
@@ -1011,7 +1083,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
Ignite g = startGrid(name);
- callback.apply(g);
+ cb.apply(g);
}
finally {
if (i != TOP_CHANGE_CNT - 1)
@@ -1024,9 +1096,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
throw F.wrap(e);
}
}
- }, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- return fut;
+ }, topChangeThreads, "topology-change-thread");
}
}
@@ -1035,12 +1105,19 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
*/
private class MultipleTopologyChangeWorker extends ConstantTopologyChangeWorker {
/**
+ * @param topChangeThreads Number of topology change threads.
+ */
+ public MultipleTopologyChangeWorker(int topChangeThreads) {
+ super(topChangeThreads);
+ }
+
+ /**
* Starts changing cluster's topology.
*
* @return Future.
*/
- @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> cb) {
+ return GridTestUtils.runMultiThreadedAsync(new CA() {
@Override public void apply() {
try {
for (int i = 0; i < TOP_CHANGE_CNT; i++) {
@@ -1062,7 +1139,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
names.add(name);
- callback.apply(g);
+ cb.apply(g);
}
}
finally {
@@ -1079,8 +1156,6 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
}
}
}, TOP_CHANGE_THREAD_CNT, "topology-change-thread");
-
- return fut;
}
}
@@ -1092,11 +1167,18 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
private CyclicBarrier barrier;
/**
+ * @param topChangeThreads Number of topology change threads.
+ */
+ public PartitionedMultipleTopologyChangeWorker(int topChangeThreads) {
+ super(topChangeThreads);
+ }
+
+ /**
* Starts changing cluster's topology.
*
* @return Future.
*/
- @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> callback) {
+ @Override IgniteInternalFuture<?> startChangingTopology(final IgniteClosure<Ignite, ?> cb) {
final Semaphore sem = new Semaphore(TOP_CHANGE_THREAD_CNT);
final ConcurrentSkipListSet<String> startedNodes = new ConcurrentSkipListSet<>();
@@ -1151,7 +1233,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
Ignite g = startGrid(name);
- callback.apply(g);
+ cb.apply(g);
}
try {