You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/07 14:39:09 UTC

[2/3] ignite git commit: ignite-4705

ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28d4779b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28d4779b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28d4779b

Branch: refs/heads/ignite-4705-debug
Commit: 28d4779b318a688a9d36371c7d1d5ab7b254b5d5
Parents: 2b38fd9
Author: sboikov <sb...@gridgain.com>
Authored: Tue Mar 7 17:07:47 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Mar 7 17:36:15 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |   8 ++
 .../dht/atomic/GridDhtAtomicCache.java          |  14 ++-
 .../GridNearAtomicAbstractUpdateFuture.java     |  41 +++++--
 .../GridNearAtomicAbstractUpdateRequest.java    |   8 ++
 .../GridNearAtomicSingleUpdateFuture.java       |   4 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   8 +-
 .../atomic/GridNearAtomicUpdateResponse.java    |  24 ++--
 .../atomic/IgniteCacheAtomicProtocolTest.java   | 116 +++++++++++++++++++
 8 files changed, 196 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 50820ae..a1b94a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -590,6 +590,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     ctx.cacheId(),
                     nodeId,
                     req.futureId(),
+                    req.partition(),
+                    false,
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -768,6 +770,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     ctx.cacheId(),
                     nodeId,
                     req.futureId(),
+                    req.partition(),
+                    false,
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -784,6 +788,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     ctx.cacheId(),
                     nodeId,
                     req.futureId(),
+                    req.partition(),
+                    false,
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());
@@ -800,6 +806,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     ctx.cacheId(),
                     nodeId,
                     req.futureId(),
+                    req.partition(),
+                    false,
                     ctx.deploymentEnabled());
 
                 res.error(req.classError());

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d6fdd10..f216a3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1708,6 +1708,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
             nodeId,
             req.futureId(),
+            req.partition(),
+            false,
             ctx.deploymentEnabled());
 
         res.addFailedKeys(req.keys(), e);
@@ -1736,11 +1738,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             return;
         }
 
-        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureId(),
+        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+            nodeId,
+            req.futureId(),
+            req.partition(),
+            false,
             ctx.deploymentEnabled());
 
-        res.partition(req.partition());
-
         assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
 
         GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -3083,10 +3087,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
             nodeId,
             checkReq.futureId(),
+            checkReq.partition(),
+            false,
             false);
 
-        res.partition(checkReq.partition());
-
         GridCacheReturn ret = new GridCacheReturn(false, true);
 
         res.returnValue(ret);

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 6e98502..8d97732 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -358,6 +358,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
             req.nodeId(),
             req.futureId(),
+            req.partition(),
+            true,
             cctx.deploymentEnabled());
 
         ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
@@ -378,6 +380,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
             req.nodeId(),
             req.futureId(),
+            req.partition(),
+            e instanceof ClusterTopologyCheckedException,
             cctx.deploymentEnabled());
 
         res.addFailedKeys(req.keys(), e);
@@ -387,9 +391,11 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
     final void onSendError(GridNearAtomicCheckUpdateRequest req, IgniteCheckedException e) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
-                req.updateRequest().nodeId(),
-                req.futureId(),
-                cctx.deploymentEnabled());
+            req.updateRequest().nodeId(),
+            req.futureId(),
+            req.partition(),
+            e instanceof ClusterTopologyCheckedException,
+            cctx.deploymentEnabled());
 
         res.addFailedKeys(req.updateRequest().keys(), e);
 
@@ -497,17 +503,35 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         }
 
         /**
-         * @param nodeId Node ID.
-         * @return Request if need process primary response, {@code null} otherwise.
+         * @return Request if need process primary fail response, {@code null} otherwise.
          */
-        @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId) {
+        @Nullable GridNearAtomicAbstractUpdateRequest onPrimaryFail() {
             if (finished())
                 return null;
 
-            if (req != null && req.nodeId().equals(nodeId) && req.response() == null)
+            if (req.fullSync() && !req.nodeFailedResponse()) {
+                req.resetResponse();
+
                 return req;
+            }
+
+            return req.response() == null ? req : null;
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @return Request if need process primary response, {@code null} otherwise.
+         */
+        @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
+            assert req.nodeId().equals(nodeId);
+
+            if (res.nodeLeftResponse())
+                return onPrimaryFail();
+
+            if (finished())
+                return null;
 
-            return null;
+            return req.response() == null ? req : null;
         }
 
         /**
@@ -558,6 +582,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
         /**
          * @param res Response.
+         * @param cctx Cache context.
          * @return {@code True} if request processing finished.
          */
         boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 4f288b1..23301c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -265,6 +265,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         return false;
     }
 
+    void resetResponse() {
+        this.res = null;
+    }
+
     /**
      * @return Response.
      */
@@ -272,6 +276,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         return res;
     }
 
+    boolean nodeFailedResponse() {
+        return res != null && res.nodeLeftResponse();
+    }
+
     /**
      * @return Topology locked flag.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 6b88bcf..f69d31d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -146,7 +146,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 return false;
 
             if (reqState.req.nodeId.equals(nodeId)) {
-                GridNearAtomicAbstractUpdateRequest req = reqState.processPrimaryResponse(nodeId);
+                GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail();
 
                 if (req != null) {
                     GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
@@ -261,7 +261,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             if (futId == null || futId != res.futureId())
                 return;
 
-            req = reqState.processPrimaryResponse(nodeId);
+            req = reqState.processPrimaryResponse(nodeId, res);
 
             if (req == null)
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 60caa14..a44ccf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -172,7 +172,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
             if (singleReq != null) {
                 if (singleReq.req.nodeId.equals(nodeId)) {
-                    GridNearAtomicAbstractUpdateRequest req = singleReq.processPrimaryResponse(nodeId);
+                    GridNearAtomicAbstractUpdateRequest req = singleReq.onPrimaryFail();
 
                     if (req != null) {
                         rcvAll = true;
@@ -211,7 +211,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     boolean reqDone = false;
 
                     if (e.getKey().equals(nodeId)) {
-                        GridNearAtomicAbstractUpdateRequest req = reqState.processPrimaryResponse(nodeId);
+                        GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail();
 
                         if (req != null) {
                             reqDone = true;
@@ -377,7 +377,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 return;
 
             if (singleReq != null) {
-                req = singleReq.processPrimaryResponse(nodeId);
+                req = singleReq.processPrimaryResponse(nodeId, res);
 
                 if (req == null)
                     return;
@@ -393,7 +393,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 if (reqState == null)
                     return;
 
-                req = reqState.processPrimaryResponse(nodeId);
+                req = reqState.processPrimaryResponse(nodeId, res);
 
                 if (req != null) {
                     if (reqState.onPrimaryResponse(res, cctx)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index e2646a3..3ee6a61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -101,6 +101,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     @GridToStringInclude
     private List<UUID> dhtNodes;
 
+    /** */
+    @GridDirectTransient
+    private boolean nodeLeft;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -114,13 +118,24 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param futId Future ID.
      * @param addDepInfo Deployment info flag.
      */
-    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, long futId, boolean addDepInfo) {
+    public GridNearAtomicUpdateResponse(int cacheId,
+        UUID nodeId,
+        long futId,
+        int partId,
+        boolean nodeLeft,
+        boolean addDepInfo) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futId = futId;
+        this.partId = partId;
+        this.nodeLeft = nodeLeft;
         this.addDepInfo = addDepInfo;
     }
 
+    public boolean nodeLeftResponse() {
+        return nodeLeft;
+    }
+
     /** {@inheritDoc} */
     @Override public int lookupIndex() {
         return CACHE_MSG_IDX;
@@ -162,13 +177,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     }
 
     /**
-     * @param partId Partition ID for proper striping on near node.
-     */
-    public void partition(int partId) {
-        this.partId = partId;
-    }
-
-    /**
      * Sets update error.
      *
      * @param err Error.

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 075be0e..3b038bd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -81,6 +81,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
         cfg.setConsistentId(gridName);
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setMaxMissedClientHeartbeats(1000);
 
         TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
 
@@ -631,6 +632,121 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPutMissedDhtRequest_UnstableTopology() throws Exception {
+        blockRebalance = true;
+
+        ccfg = cacheConfiguration(1, FULL_SYNC);
+
+        startServers(4);
+
+        client = true;
+
+        Ignite client = startGrid(4);
+
+        IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
+        IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+        testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() {
+            @Override public boolean apply(GridIoMessage msg) {
+                return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+            }
+        });
+
+        Integer key = primaryKey(ignite(0).cache(TEST_CACHE));
+
+        log.info("Start put [key=" + key + ']');
+
+        nearAsyncCache.put(key, key);
+        IgniteFuture<?> fut = nearAsyncCache.future();
+
+        U.sleep(500);
+
+        assertFalse(fut.isDone());
+
+        stopGrid(0);
+
+        fut.get();
+
+        checkData(F.asMap(key, key));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllMissedDhtRequest_UnstableTopology1() throws Exception {
+        putAllMissedDhtRequest_UnstableTopology(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllMissedDhtRequest_UnstableTopology2() throws Exception {
+        putAllMissedDhtRequest_UnstableTopology(true, true);
+    }
+
+    /**
+     * @param fail0 Fail node 0 flag.
+     * @param fail1 Fail node 1 flag.
+     * @throws Exception If failed.
+     */
+    private void putAllMissedDhtRequest_UnstableTopology(boolean fail0, boolean fail1) throws Exception {
+        blockRebalance = true;
+
+        ccfg = cacheConfiguration(1, FULL_SYNC);
+
+        startServers(4);
+
+        client = true;
+
+        Ignite client = startGrid(4);
+
+        IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
+        IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+        if (fail0) {
+            testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() {
+                @Override public boolean apply(GridIoMessage msg) {
+                    return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+                }
+            });
+        }
+        if (fail1) {
+            testSpi(ignite(2)).blockMessages(new IgnitePredicate<GridIoMessage>() {
+                @Override public boolean apply(GridIoMessage msg) {
+                    return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+                }
+            });
+        }
+
+        Integer key1 = primaryKey(ignite(0).cache(TEST_CACHE));
+        Integer key2 = primaryKey(ignite(2).cache(TEST_CACHE));
+
+        log.info("Start put [key1=" + key1 + ", key2=" + key1 + ']');
+
+        Map<Integer, Integer> map = new HashMap<>();
+        map.put(key1, 10);
+        map.put(key2, 20);
+
+        nearAsyncCache.putAll(map);
+        IgniteFuture<?> fut = nearAsyncCache.future();
+
+        U.sleep(500);
+
+        assertFalse(fut.isDone());
+
+        if (fail0)
+            stopGrid(0);
+        if (fail1)
+            stopGrid(2);
+
+        fut.get();
+
+        checkData(map);
+    }
+
+    /**
      * @param expData Expected cache data.
      */
     private void checkData(Map<Integer, Integer> expData) {