You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/27 16:49:07 UTC
[16/38] incubator-ignite git commit: # ignite-23
# ignite-23
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f903ff3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f903ff3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f903ff3c
Branch: refs/heads/ignite-709_2
Commit: f903ff3cbf837e5c5e1c86d114bea250419bdbfe
Parents: f3c6855
Author: sboikov <se...@inria.fr>
Authored: Fri May 22 06:51:42 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Fri May 22 06:59:50 2015 +0300
----------------------------------------------------------------------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +-
.../colocated/GridDhtColocatedLockFuture.java | 11 ++--
.../distributed/near/GridNearLockFuture.java | 11 ++--
...niteCacheClientNodeChangingTopologyTest.java | 62 ++++++++++++--------
4 files changed, 52 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f903ff3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index b7c4719..f78ced3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -343,7 +343,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
*/
public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (res.remapKeys() != null) {
- assert cctx.config().getAtomicWriteOrderMode() == PRIMARY || cctx.kernalContext().clientNode();
+ assert !fastMap || cctx.kernalContext().clientNode();
mapOnTopology(res.remapKeys(), true, nodeId, true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f903ff3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index a90c6e4..788a101 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -326,13 +326,14 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
* Undoes all locks.
*
* @param dist If {@code true}, then remove locks from remote nodes as well.
+ * @param rollback {@code True} if should rollback tx.
*/
- private void undoLocks(boolean dist) {
+ private void undoLocks(boolean dist, boolean rollback) {
// Transactions will undo during rollback.
if (dist && tx == null)
cctx.colocated().removeLocks(threadId, lockVer, keys);
else {
- if (tx != null) {
+ if (rollback && tx != null) {
if (tx.setRollbackOnly()) {
if (log.isDebugEnabled())
log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx);
@@ -350,7 +351,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
* @param dist {@code True} if need to distribute lock release.
*/
private void onFailed(boolean dist) {
- undoLocks(dist);
+ undoLocks(dist, true);
complete(false);
}
@@ -475,7 +476,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
", fut=" + this + ']');
if (!success)
- undoLocks(distribute);
+ undoLocks(distribute, true);
if (tx != null)
cctx.tm().txContext(tx);
@@ -1369,6 +1370,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
*
*/
private void remap() {
+ undoLocks(false, false);
+
mapOnTopology(true);
onDone(true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f903ff3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 92498f0..001c78c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -349,13 +349,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
* Undoes all locks.
*
* @param dist If {@code true}, then remove locks from remote nodes as well.
+ * @param rollback {@code True} if should rollback tx.
*/
- private void undoLocks(boolean dist) {
+ private void undoLocks(boolean dist, boolean rollback) {
// Transactions will undo during rollback.
if (dist && tx == null)
cctx.nearTx().removeLocks(lockVer, keys);
else {
- if (tx != null) {
+ if (rollback && tx != null) {
if (tx.setRollbackOnly()) {
if (log.isDebugEnabled())
log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx);
@@ -396,7 +397,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
* @param dist {@code True} if need to distribute lock release.
*/
private void onFailed(boolean dist) {
- undoLocks(dist);
+ undoLocks(dist, true);
complete(false);
}
@@ -606,7 +607,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
", fut=" + this + ']');
if (!success)
- undoLocks(distribute);
+ undoLocks(distribute, true);
if (tx != null)
cctx.tm().txContext(tx);
@@ -1512,6 +1513,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
*
*/
private void remap() {
+ undoLocks(false, false);
+
mapOnTopology(true);
onDone(true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f903ff3c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 20f3d58..0236446 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -166,7 +166,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
putFut.get();
- checkData(map, 4);
+ checkData(map, cache, 4);
ignite3.close();
@@ -200,14 +200,14 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
putFut.get();
- checkData(map, 4);
+ checkData(map, cache, 4);
for (int i = 0; i < 100; i++)
map.put(i, i + 2);
cache.putAll(map);
- checkData(map, 4);
+ checkData(map, cache, 4);
}
/**
@@ -265,7 +265,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
putFut.get();
- checkData(map, 4);
+ checkData(map, cache, 4);
map.clear();
@@ -274,7 +274,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
cache.putAll(map);
- checkData(map, 4);
+ checkData(map, cache, 4);
}
/**
* @throws Exception If failed.
@@ -286,7 +286,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
/**
* @throws Exception If failed.
*/
- public void _testPessimisticTxNearEnabled() throws Exception {
+ public void testPessimisticTxNearEnabled() throws Exception {
pessimisticTx(new NearCacheConfiguration());
}
@@ -351,7 +351,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
putFut.get();
- checkData(map, 4);
+ checkData(map, cache, 4);
ignite3.close();
@@ -384,7 +384,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
putFut.get();
- checkData(map, 4);
+ checkData(map, cache, 4);
for (int i = 0; i < 100; i++)
map.put(i, i + 2);
@@ -395,7 +395,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
tx.commit();
}
- checkData(map, 4);
+ checkData(map, cache, 4);
}
/**
@@ -448,7 +448,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
checkClientPrepareMessages(spi.recordedMessages(), 6);
- checkData(map, 4);
+ checkData(map, cache, 4);
cache.putAll(map);
@@ -456,7 +456,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
spi.record(null);
- checkData(map, 4);
+ checkData(map, cache, 4);
IgniteCache<Integer, Integer> cache0 = ignite0.cache(null);
@@ -475,7 +475,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
for (Object msg : msgs)
assertFalse(((GridNearTxPrepareRequest)msg).firstClientRequest());
- checkData(map, 4);
+ checkData(map, cache, 4);
}
/**
@@ -546,13 +546,19 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
* @param expNodes Expected nodes number.
* @throws Exception If failed.
*/
- private void checkData(final Map<Integer, Integer> map, final int expNodes) throws Exception {
+ private void checkData(final Map<Integer, Integer> map, IgniteCache<?, ?> clientCache, final int expNodes)
+ throws Exception
+ {
final List<Ignite> nodes = G.allGrids();
final Affinity<Integer> aff = nodes.get(0).affinity(null);
assertEquals(expNodes, nodes.size());
+ boolean hasNearCache = clientCache.getConfiguration(CacheConfiguration.class).getNearConfiguration() != null;
+
+ final Ignite nearCacheNode = hasNearCache ? clientCache.unwrap(Ignite.class) : null;
+
boolean wait = GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
try {
@@ -562,7 +568,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
for (Ignite node : nodes) {
IgniteCache<Integer, Integer> cache = node.cache(null);
- if (aff.isPrimaryOrBackup(node.cluster().localNode(), key))
+ if (aff.isPrimaryOrBackup(node.cluster().localNode(), key) || node == nearCacheNode)
assertEquals("Unexpected value for " + node.name(), e.getValue(), cache.localPeek(key));
else
assertNull("Unexpected non-null value for " + node.name(), cache.localPeek(key));
@@ -744,16 +750,18 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
}
});
- updateBarrier.await(15_000, TimeUnit.MILLISECONDS);
-
- CyclicBarrier barrier0 = updateBarrier;
-
- if (barrier0 != null) {
+ try {
+ updateBarrier.await(30_000, TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e) {
log.info("Failed to wait for update.");
U.dumpThreads(log);
- barrier0.reset();
+ CyclicBarrier barrier0 = updateBarrier;
+
+ if (barrier0 != null)
+ barrier0.reset();
fail("Failed to wait for update.");
}
@@ -770,16 +778,20 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
}
});
- updateBarrier.await(15_000, TimeUnit.MILLISECONDS);
-
- barrier0 = updateBarrier;
+ updateBarrier.await(30_000, TimeUnit.MILLISECONDS);
- if (barrier0 != null) {
+ try {
+ updateBarrier.await(30_000, TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e) {
log.info("Failed to wait for update.");
U.dumpThreads(log);
- barrier0.reset();
+ CyclicBarrier barrier0 = updateBarrier;
+
+ if (barrier0 != null)
+ barrier0.reset();
fail("Failed to wait for update.");
}