You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/31 04:01:31 UTC
[48/50] [abbrv] ignite git commit: ignite-1304 Changed
GridNearOptimisticTxPrepareFuture to do not execute whole prepare process
with topology read lock held
ignite-1304 Changed GridNearOptimisticTxPrepareFuture to do not execute whole prepare process with topology read lock held
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/37a0505c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/37a0505c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/37a0505c
Branch: refs/heads/ignite-843
Commit: 37a0505c11136321e91caa6db41e74c6ef89b734
Parents: e2f522b
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 28 16:57:23 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 28 16:57:23 2015 +0300
----------------------------------------------------------------------
.../near/GridNearOptimisticTxPrepareFuture.java | 87 ++++++++++---------
...acheAsyncOperationsFailoverAbstractTest.java | 91 +++++++++++++-------
2 files changed, 110 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/37a0505c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 8e66cb6..2d6b2a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -227,6 +227,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
if (topVer != null) {
tx.topologyVersion(topVer);
+ cctx.mvcc().addFuture(this);
+
prepare0(false);
return;
@@ -242,6 +244,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) {
GridDhtTopologyFuture topFut = topologyReadLock();
+ AffinityTopologyVersion topVer = null;
+
try {
if (topFut == null) {
assert isDone();
@@ -250,52 +254,61 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
}
if (topFut.isDone()) {
- StringBuilder invalidCaches = new StringBuilder();
+ topVer = topFut.topologyVersion();
- boolean cacheInvalid = false;
+ if (remap)
+ tx.onRemap(topVer);
+ else
+ tx.topologyVersion(topVer);
- for (GridCacheContext ctx : cctx.cacheContexts()) {
- if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx)) {
- if (cacheInvalid)
- invalidCaches.append(", ");
+ if (!remap)
+ cctx.mvcc().addFuture(this);
+ }
+ }
+ finally {
+ topologyReadUnlock();
+ }
- invalidCaches.append(U.maskName(ctx.name()));
+ if (topVer != null) {
+ StringBuilder invalidCaches = null;
- cacheInvalid = true;
- }
- }
+ for (Integer cacheId : tx.activeCacheIds()) {
+ GridCacheContext ctx = cctx.cacheContext(cacheId);
- if (cacheInvalid) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- invalidCaches.toString()));
+ assert ctx != null : cacheId;
- return;
- }
+ if (!topFut.isCacheTopologyValid(ctx)) {
+ if (invalidCaches != null)
+ invalidCaches.append(", ");
+ else
+ invalidCaches = new StringBuilder();
- if (remap)
- tx.onRemap(topFut.topologyVersion());
- else
- tx.topologyVersion(topFut.topologyVersion());
+ invalidCaches.append(U.maskName(ctx.name()));
+ }
+ }
- prepare0(remap);
+ if (invalidCaches != null) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
+ invalidCaches.toString()));
- if (c != null)
- c.run();
- }
- else {
- topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
- @Override public void run() {
- prepareOnTopology(remap, c);
- }
- });
- }
- });
+ return;
}
+
+ prepare0(remap);
+
+ if (c != null)
+ c.run();
}
- finally {
- topologyReadUnlock();
+ else {
+ topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+ @Override public void run() {
+ prepareOnTopology(remap, c);
+ }
+ });
+ }
+ });
}
}
@@ -382,10 +395,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
return;
}
- // Make sure to add future before calling prepare.
- if (!remap)
- cctx.mvcc().addFuture(this);
-
prepare(
tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
tx.writeEntries());
http://git-wip-us.apache.org/repos/asf/ignite/blob/37a0505c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
index 1669404..36eb9a7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
@@ -60,6 +60,7 @@ public abstract class CacheAsyncOperationsFailoverAbstractTest extends GridCache
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
CacheConfiguration ccfg = super.cacheConfiguration(gridName);
@@ -93,59 +94,91 @@ public abstract class CacheAsyncOperationsFailoverAbstractTest extends GridCache
* @throws Exception If failed.
*/
public void testAsyncFailover() throws Exception {
- for (int i = 0; i < 3; i++) {
+ IgniteCache<TestKey, TestValue> cache = ignite(0).<TestKey, TestValue>cache(null).withAsync();
+
+ int ops = cache.getConfiguration(CacheConfiguration.class).getMaxConcurrentAsyncOperations();
+
+ log.info("Max concurrent async operations: " + ops);
+
+ assertTrue(ops > 0);
+
+ // Start/stop one node.
+ for (int i = 0; i < 2; i++) {
log.info("Iteration: " + i);
startGrid(NODE_CNT);
- final IgniteCache<TestKey, TestValue> cache = ignite(0).<TestKey, TestValue>cache(null).withAsync();
+ List<IgniteFuture<?>> futs = startAsyncOperations(ops, cache);
- int ops = cache.getConfiguration(CacheConfiguration.class).getMaxConcurrentAsyncOperations();
+ stopGrid(NODE_CNT);
- log.info("Max concurrent async operations: " + ops);
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
- assertTrue(ops > 0);
+ log.info("Iteration done: " + i);
+ }
- final List<IgniteFuture<?>> futs = Collections.synchronizedList(new ArrayList<IgniteFuture<?>>(ops));
+ // Start all nodes except one.
+ try {
+ List<IgniteFuture<?>> futs = startAsyncOperations(ops, cache);
- final AtomicInteger left = new AtomicInteger(ops);
+ for (int i = 1; i < NODE_CNT; i++)
+ stopGrid(i);
- GridTestUtils.runMultiThreaded(new Callable<Object>() {
- @Override public Object call() throws Exception {
- List<IgniteFuture<?>> futs0 = new ArrayList<>();
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+ }
+ finally {
+ for (int i = 1; i < NODE_CNT; i++)
+ startGrid(i);
+ }
+ }
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ /**
+ * @param ops Number of operations.
+ * @param cache Cache.
+ * @return Futures.
+ * @throws Exception If failed.
+ */
+ private List<IgniteFuture<?>> startAsyncOperations(final int ops, final IgniteCache<TestKey, TestValue> cache)
+ throws Exception
+ {
+ final List<IgniteFuture<?>> futs = Collections.synchronizedList(new ArrayList<IgniteFuture<?>>(ops));
- while (left.getAndDecrement() > 0) {
- TreeMap<TestKey, TestValue> map = new TreeMap<>();
+ final AtomicInteger left = new AtomicInteger(ops);
- int keys = 50;
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ List<IgniteFuture<?>> futs0 = new ArrayList<>();
- for (int k = 0; k < keys; k++)
- map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(k));
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- cache.putAll(map);
+ while (left.getAndDecrement() > 0) {
+ TreeMap<TestKey, TestValue> map = new TreeMap<>();
- IgniteFuture<?> fut = cache.future();
+ int keys = 50;
- assertNotNull(fut);
+ for (int k = 0; k < keys; k++)
+ map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(k));
- futs0.add(fut);
- }
+ cache.putAll(map);
- futs.addAll(futs0);
+ IgniteFuture<?> fut = cache.future();
- return null;
+ assertNotNull(fut);
+
+ futs0.add(fut);
}
- }, 10, "put-thread");
- stopGrid(NODE_CNT);
+ futs.addAll(futs0);
+
+ return null;
+ }
+ }, 10, "put-thread");
- assertEquals(ops, futs.size());
+ assertEquals(ops, futs.size());
- for (IgniteFuture<?> fut : futs)
- fut.get();
- }
+ return futs;
}
/**