You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/28 08:40:52 UTC

ignite git commit: ignite-1304 Changed GridNearOptimisticTxPrepareFuture to do not execute whole prepare process with topology read lock held

Repository: ignite
Updated Branches:
  refs/heads/ignite-1304 [created] a8e286e69


ignite-1304 Changed GridNearOptimisticTxPrepareFuture to do not execute whole prepare process with topology read lock held


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8e286e6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8e286e6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8e286e6

Branch: refs/heads/ignite-1304
Commit: a8e286e69d51d9886852e1f925562155399625e2
Parents: 3e30c86
Author: sboikov <sb...@gridgain.com>
Authored: Fri Aug 28 09:30:45 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Aug 28 09:40:06 2015 +0300

----------------------------------------------------------------------
 .../near/GridNearOptimisticTxPrepareFuture.java | 87 ++++++++++---------
 ...acheAsyncOperationsFailoverAbstractTest.java | 91 +++++++++++++-------
 2 files changed, 110 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a8e286e6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 8e66cb6..2d6b2a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -227,6 +227,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         if (topVer != null) {
             tx.topologyVersion(topVer);
 
+            cctx.mvcc().addFuture(this);
+
             prepare0(false);
 
             return;
@@ -242,6 +244,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
     private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) {
         GridDhtTopologyFuture topFut = topologyReadLock();
 
+        AffinityTopologyVersion topVer = null;
+
         try {
             if (topFut == null) {
                 assert isDone();
@@ -250,52 +254,61 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
             }
 
             if (topFut.isDone()) {
-                StringBuilder invalidCaches = new StringBuilder();
+                topVer = topFut.topologyVersion();
 
-                boolean cacheInvalid = false;
+                if (remap)
+                    tx.onRemap(topVer);
+                else
+                    tx.topologyVersion(topVer);
 
-                for (GridCacheContext ctx : cctx.cacheContexts()) {
-                    if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx)) {
-                        if (cacheInvalid)
-                            invalidCaches.append(", ");
+                if (!remap)
+                    cctx.mvcc().addFuture(this);
+            }
+        }
+        finally {
+            topologyReadUnlock();
+        }
 
-                        invalidCaches.append(U.maskName(ctx.name()));
+        if (topVer != null) {
+            StringBuilder invalidCaches = null;
 
-                        cacheInvalid = true;
-                    }
-                }
+            for (Integer cacheId : tx.activeCacheIds()) {
+                GridCacheContext ctx = cctx.cacheContext(cacheId);
 
-                if (cacheInvalid) {
-                    onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
-                        invalidCaches.toString()));
+                assert ctx != null : cacheId;
 
-                    return;
-                }
+                if (!topFut.isCacheTopologyValid(ctx)) {
+                    if (invalidCaches != null)
+                        invalidCaches.append(", ");
+                    else
+                        invalidCaches = new StringBuilder();
 
-                if (remap)
-                    tx.onRemap(topFut.topologyVersion());
-                else
-                    tx.topologyVersion(topFut.topologyVersion());
+                    invalidCaches.append(U.maskName(ctx.name()));
+                }
+            }
 
-                prepare0(remap);
+            if (invalidCaches != null) {
+                onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
+                    invalidCaches.toString()));
 
-                if (c != null)
-                    c.run();
-            }
-            else {
-                topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                        cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
-                            @Override public void run() {
-                                prepareOnTopology(remap, c);
-                            }
-                        });
-                    }
-                });
+                return;
             }
+
+            prepare0(remap);
+
+            if (c != null)
+                c.run();
         }
-        finally {
-            topologyReadUnlock();
+        else {
+            topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                    cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+                        @Override public void run() {
+                            prepareOnTopology(remap, c);
+                        }
+                    });
+                }
+            });
         }
     }
 
@@ -382,10 +395,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                 return;
             }
 
-            // Make sure to add future before calling prepare.
-            if (!remap)
-                cctx.mvcc().addFuture(this);
-
             prepare(
                 tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
                 tx.writeEntries());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8e286e6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
index 1669404..36eb9a7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
@@ -60,6 +60,7 @@ public abstract class CacheAsyncOperationsFailoverAbstractTest extends GridCache
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
         CacheConfiguration ccfg = super.cacheConfiguration(gridName);
 
@@ -93,59 +94,91 @@ public abstract class CacheAsyncOperationsFailoverAbstractTest extends GridCache
      * @throws Exception If failed.
      */
     public void testAsyncFailover() throws Exception {
-        for (int i = 0; i < 3; i++) {
+        IgniteCache<TestKey, TestValue> cache = ignite(0).<TestKey, TestValue>cache(null).withAsync();
+
+        int ops = cache.getConfiguration(CacheConfiguration.class).getMaxConcurrentAsyncOperations();
+
+        log.info("Max concurrent async operations: " + ops);
+
+        assertTrue(ops > 0);
+
+        // Start/stop one node.
+        for (int i = 0; i < 2; i++) {
             log.info("Iteration: " + i);
 
             startGrid(NODE_CNT);
 
-            final IgniteCache<TestKey, TestValue> cache = ignite(0).<TestKey, TestValue>cache(null).withAsync();
+            List<IgniteFuture<?>> futs = startAsyncOperations(ops, cache);
 
-            int ops = cache.getConfiguration(CacheConfiguration.class).getMaxConcurrentAsyncOperations();
+            stopGrid(NODE_CNT);
 
-            log.info("Max concurrent async operations: " + ops);
+            for (IgniteFuture<?> fut : futs)
+                fut.get();
 
-            assertTrue(ops > 0);
+            log.info("Iteration done: " + i);
+        }
 
-            final List<IgniteFuture<?>> futs = Collections.synchronizedList(new ArrayList<IgniteFuture<?>>(ops));
+        // Start all nodes except one.
+        try {
+            List<IgniteFuture<?>> futs = startAsyncOperations(ops, cache);
 
-            final AtomicInteger left = new AtomicInteger(ops);
+            for (int i = 1; i < NODE_CNT; i++)
+                stopGrid(i);
 
-            GridTestUtils.runMultiThreaded(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    List<IgniteFuture<?>> futs0 = new ArrayList<>();
+            for (IgniteFuture<?> fut : futs)
+                fut.get();
+        }
+        finally {
+            for (int i = 1; i < NODE_CNT; i++)
+                startGrid(i);
+        }
+    }
 
-                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+    /**
+     * @param ops Number of operations.
+     * @param cache Cache.
+     * @return Futures.
+     * @throws Exception If failed.
+     */
+    private List<IgniteFuture<?>> startAsyncOperations(final int ops, final IgniteCache<TestKey, TestValue> cache)
+        throws Exception
+    {
+        final List<IgniteFuture<?>> futs = Collections.synchronizedList(new ArrayList<IgniteFuture<?>>(ops));
 
-                    while (left.getAndDecrement() > 0) {
-                        TreeMap<TestKey, TestValue> map = new TreeMap<>();
+        final AtomicInteger left = new AtomicInteger(ops);
 
-                        int keys = 50;
+        GridTestUtils.runMultiThreaded(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                List<IgniteFuture<?>> futs0 = new ArrayList<>();
 
-                        for (int k = 0; k < keys; k++)
-                            map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(k));
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                        cache.putAll(map);
+                while (left.getAndDecrement() > 0) {
+                    TreeMap<TestKey, TestValue> map = new TreeMap<>();
 
-                        IgniteFuture<?> fut = cache.future();
+                    int keys = 50;
 
-                        assertNotNull(fut);
+                    for (int k = 0; k < keys; k++)
+                        map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(k));
 
-                        futs0.add(fut);
-                    }
+                    cache.putAll(map);
 
-                    futs.addAll(futs0);
+                    IgniteFuture<?> fut = cache.future();
 
-                    return null;
+                    assertNotNull(fut);
+
+                    futs0.add(fut);
                 }
-            }, 10, "put-thread");
 
-            stopGrid(NODE_CNT);
+                futs.addAll(futs0);
+
+                return null;
+            }
+        }, 10, "put-thread");
 
-            assertEquals(ops, futs.size());
+        assertEquals(ops, futs.size());
 
-            for (IgniteFuture<?> fut : futs)
-                fut.get();
-        }
+        return futs;
     }
 
     /**