You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/29 14:46:24 UTC

[17/50] incubator-ignite git commit: # ignite-23

# ignite-23


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/00eadd62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/00eadd62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/00eadd62

Branch: refs/heads/ignite-929
Commit: 00eadd62ea451922e9fe3396085f5f003f36c32e
Parents: e4e54ba
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 25 13:34:13 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 25 13:34:13 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 49 +++++++++---
 ...niteCacheClientNodeChangingTopologyTest.java | 78 ++++++++++++++++++++
 2 files changed, 115 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00eadd62/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 50c3d56..64a4882 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -143,6 +143,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /** Skip store flag. */
     private final boolean skipStore;
 
+    /** */
+    private boolean fastMapRemap;
+
     /**
      * @param cctx Cache context.
      * @param cache Cache instance.
@@ -345,7 +348,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         if (res.remapKeys() != null) {
             assert !fastMap || cctx.kernalContext().clientNode();
 
-            Collection<?> remapKeys = fastMap && cctx.kernalContext().clientNode() ? null : res.remapKeys();
+            Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
 
             mapOnTopology(remapKeys, true, nodeId, true);
 
@@ -456,8 +459,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             else {
                 if (waitTopFut) {
                     fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override
-                        public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                             mapOnTopology(keys, remap, oldNodeId, waitTopFut);
                         }
                     });
@@ -478,15 +480,29 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /**
      * Checks if future is ready to be completed.
      */
-    private synchronized void checkComplete() {
-        if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
-            CachePartialUpdateCheckedException err0 = err;
+    private void checkComplete() {
+        boolean remap = false;
 
-            if (err0 != null)
-                onDone(err0);
-            else
-                onDone(opRes);
+        synchronized (this) {
+            if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) {
+                CachePartialUpdateCheckedException err0 = err;
+
+                if (err0 != null)
+                    onDone(err0);
+                else {
+                    if (fastMapRemap) {
+                        assert cctx.kernalContext().clientNode();
+
+                        remap = true;
+                    }
+                    else
+                        onDone(opRes);
+                }
+            }
         }
+
+        if (remap)
+            mapOnTopology(null, true, null, true);
     }
 
     /**
@@ -500,7 +516,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         @Nullable Collection<?> remapKeys,
         boolean remap,
         @Nullable UUID oldNodeId) {
-        assert oldNodeId == null || remap;
+        assert oldNodeId == null || remap || fastMapRemap;
 
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
@@ -652,9 +668,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         // Must do this in synchronized block because we need to atomically remove and add mapping.
         // Otherwise checkComplete() may see empty intermediate state.
         synchronized (this) {
-            if (remap)
+            if (oldNodeId != null)
                 removeMapping(oldNodeId);
 
+            // For fastMap mode wait for all responses before remapping.
+            if (remap && fastMap && !mappings.isEmpty()) {
+                fastMapRemap = true;
+
+                return;
+            }
+
             // Create mappings first, then send messages.
             for (Object key : keys) {
                 if (key == null) {
@@ -772,6 +795,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     i++;
                 }
             }
+
+            fastMapRemap = false;
         }
 
         if ((single == null || single) && pendingMappings.size() == 1) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00eadd62/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 4603aaf..5a5a648 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -264,6 +264,84 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
     /**
      * @throws Exception If failed.
      */
+    public void testAtomicGetAndPutClockMode() throws Exception {
+        atomicGetAndPut(CLOCK);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicGetAndPutPrimaryMode() throws Exception {
+        atomicGetAndPut(PRIMARY);
+    }
+
+    /**
+     * @param writeOrder Write order.
+     * @throws Exception If failed.
+     */
+    private void atomicGetAndPut(CacheAtomicWriteOrderMode writeOrder) throws Exception {
+        ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(writeOrder);
+        ccfg.setRebalanceMode(SYNC);
+
+        IgniteEx ignite0 = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+
+        client = true;
+
+        ignite0.cache(null).put(0, 0);
+
+        Ignite ignite2 = startGrid(2);
+
+        assertTrue(ignite2.configuration().isClientMode());
+
+        final Map<Integer, Integer> map = new HashMap<>();
+
+        map.put(0, 1);
+
+        TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
+
+        // Block messages requests for both nodes.
+        spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
+        spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
+
+        final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
+
+        assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
+
+        IgniteInternalFuture<Integer> putFut = GridTestUtils.runAsync(new Callable<Integer>() {
+            @Override public Integer call() throws Exception {
+                Thread.currentThread().setName("put-thread");
+
+                return cache.getAndPut(0, 1);
+            }
+        });
+
+        assertFalse(putFut.isDone());
+
+        client = false;
+
+        startGrid(3);
+
+        log.info("Stop block.");
+
+        spi.stopBlock();
+
+        Integer old = putFut.get();
+
+        checkData(map, cache, 4);
+
+        assertEquals((Object)0, old);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testTxPutAll() throws Exception {
         ccfg = new CacheConfiguration();