You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/07 14:39:09 UTC
[2/3] ignite git commit: ignite-4705
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28d4779b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28d4779b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28d4779b
Branch: refs/heads/ignite-4705-debug
Commit: 28d4779b318a688a9d36371c7d1d5ab7b254b5d5
Parents: 2b38fd9
Author: sboikov <sb...@gridgain.com>
Authored: Tue Mar 7 17:07:47 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Mar 7 17:36:15 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 8 ++
.../dht/atomic/GridDhtAtomicCache.java | 14 ++-
.../GridNearAtomicAbstractUpdateFuture.java | 41 +++++--
.../GridNearAtomicAbstractUpdateRequest.java | 8 ++
.../GridNearAtomicSingleUpdateFuture.java | 4 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 8 +-
.../atomic/GridNearAtomicUpdateResponse.java | 24 ++--
.../atomic/IgniteCacheAtomicProtocolTest.java | 116 +++++++++++++++++++
8 files changed, 196 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 50820ae..a1b94a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -590,6 +590,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
ctx.cacheId(),
nodeId,
req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
res.error(req.classError());
@@ -768,6 +770,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
ctx.cacheId(),
nodeId,
req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
res.error(req.classError());
@@ -784,6 +788,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
ctx.cacheId(),
nodeId,
req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
res.error(req.classError());
@@ -800,6 +806,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
ctx.cacheId(),
nodeId,
req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
res.error(req.classError());
http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d6fdd10..f216a3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1708,6 +1708,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId,
req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
res.addFailedKeys(req.keys(), e);
@@ -1736,11 +1738,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureId(),
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+ nodeId,
+ req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
- res.partition(req.partition());
-
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -3083,10 +3087,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId,
checkReq.futureId(),
+ checkReq.partition(),
+ false,
false);
- res.partition(checkReq.partition());
-
GridCacheReturn ret = new GridCacheReturn(false, true);
res.returnValue(ret);
http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 6e98502..8d97732 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -358,6 +358,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
req.nodeId(),
req.futureId(),
+ req.partition(),
+ true,
cctx.deploymentEnabled());
ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
@@ -378,6 +380,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
req.nodeId(),
req.futureId(),
+ req.partition(),
+ e instanceof ClusterTopologyCheckedException,
cctx.deploymentEnabled());
res.addFailedKeys(req.keys(), e);
@@ -387,9 +391,11 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
final void onSendError(GridNearAtomicCheckUpdateRequest req, IgniteCheckedException e) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- req.updateRequest().nodeId(),
- req.futureId(),
- cctx.deploymentEnabled());
+ req.updateRequest().nodeId(),
+ req.futureId(),
+ req.partition(),
+ e instanceof ClusterTopologyCheckedException,
+ cctx.deploymentEnabled());
res.addFailedKeys(req.updateRequest().keys(), e);
@@ -497,17 +503,35 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
/**
- * @param nodeId Node ID.
- * @return Request if need process primary response, {@code null} otherwise.
+ * @return Request if need process primary fail response, {@code null} otherwise.
*/
- @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId) {
+ @Nullable GridNearAtomicAbstractUpdateRequest onPrimaryFail() {
if (finished())
return null;
- if (req != null && req.nodeId().equals(nodeId) && req.response() == null)
+ if (req.fullSync() && !req.nodeFailedResponse()) {
+ req.resetResponse();
+
return req;
+ }
+
+ return req.response() == null ? req : null;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return Request if need process primary response, {@code null} otherwise.
+ */
+ @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ assert req.nodeId().equals(nodeId);
+
+ if (res.nodeLeftResponse())
+ return onPrimaryFail();
+
+ if (finished())
+ return null;
- return null;
+ return req.response() == null ? req : null;
}
/**
@@ -558,6 +582,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/**
* @param res Response.
+ * @param cctx Cache context.
* @return {@code True} if request processing finished.
*/
boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 4f288b1..23301c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -265,6 +265,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
return false;
}
+ void resetResponse() {
+ this.res = null;
+ }
+
/**
* @return Response.
*/
@@ -272,6 +276,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
return res;
}
+ boolean nodeFailedResponse() {
+ return res != null && res.nodeLeftResponse();
+ }
+
/**
* @return Topology locked flag.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 6b88bcf..f69d31d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -146,7 +146,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
return false;
if (reqState.req.nodeId.equals(nodeId)) {
- GridNearAtomicAbstractUpdateRequest req = reqState.processPrimaryResponse(nodeId);
+ GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail();
if (req != null) {
GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
@@ -261,7 +261,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (futId == null || futId != res.futureId())
return;
- req = reqState.processPrimaryResponse(nodeId);
+ req = reqState.processPrimaryResponse(nodeId, res);
if (req == null)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 60caa14..a44ccf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -172,7 +172,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (singleReq != null) {
if (singleReq.req.nodeId.equals(nodeId)) {
- GridNearAtomicAbstractUpdateRequest req = singleReq.processPrimaryResponse(nodeId);
+ GridNearAtomicAbstractUpdateRequest req = singleReq.onPrimaryFail();
if (req != null) {
rcvAll = true;
@@ -211,7 +211,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
boolean reqDone = false;
if (e.getKey().equals(nodeId)) {
- GridNearAtomicAbstractUpdateRequest req = reqState.processPrimaryResponse(nodeId);
+ GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail();
if (req != null) {
reqDone = true;
@@ -377,7 +377,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
return;
if (singleReq != null) {
- req = singleReq.processPrimaryResponse(nodeId);
+ req = singleReq.processPrimaryResponse(nodeId, res);
if (req == null)
return;
@@ -393,7 +393,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (reqState == null)
return;
- req = reqState.processPrimaryResponse(nodeId);
+ req = reqState.processPrimaryResponse(nodeId, res);
if (req != null) {
if (reqState.onPrimaryResponse(res, cctx)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index e2646a3..3ee6a61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -101,6 +101,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
@GridToStringInclude
private List<UUID> dhtNodes;
+ /** */
+ @GridDirectTransient
+ private boolean nodeLeft;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -114,13 +118,24 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param futId Future ID.
* @param addDepInfo Deployment info flag.
*/
- public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, long futId, boolean addDepInfo) {
+ public GridNearAtomicUpdateResponse(int cacheId,
+ UUID nodeId,
+ long futId,
+ int partId,
+ boolean nodeLeft,
+ boolean addDepInfo) {
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futId = futId;
+ this.partId = partId;
+ this.nodeLeft = nodeLeft;
this.addDepInfo = addDepInfo;
}
+ public boolean nodeLeftResponse() {
+ return nodeLeft;
+ }
+
/** {@inheritDoc} */
@Override public int lookupIndex() {
return CACHE_MSG_IDX;
@@ -162,13 +177,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
- * @param partId Partition ID for proper striping on near node.
- */
- public void partition(int partId) {
- this.partId = partId;
- }
-
- /**
* Sets update error.
*
* @param err Error.
http://git-wip-us.apache.org/repos/asf/ignite/blob/28d4779b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 075be0e..3b038bd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -81,6 +81,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
cfg.setConsistentId(gridName);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setMaxMissedClientHeartbeats(1000);
TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
@@ -631,6 +632,121 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testPutMissedDhtRequest_UnstableTopology() throws Exception {
+ blockRebalance = true;
+
+ ccfg = cacheConfiguration(1, FULL_SYNC);
+
+ startServers(4);
+
+ client = true;
+
+ Ignite client = startGrid(4);
+
+ IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
+ IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+ testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() {
+ @Override public boolean apply(GridIoMessage msg) {
+ return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+ }
+ });
+
+ Integer key = primaryKey(ignite(0).cache(TEST_CACHE));
+
+ log.info("Start put [key=" + key + ']');
+
+ nearAsyncCache.put(key, key);
+ IgniteFuture<?> fut = nearAsyncCache.future();
+
+ U.sleep(500);
+
+ assertFalse(fut.isDone());
+
+ stopGrid(0);
+
+ fut.get();
+
+ checkData(F.asMap(key, key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllMissedDhtRequest_UnstableTopology1() throws Exception {
+ putAllMissedDhtRequest_UnstableTopology(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllMissedDhtRequest_UnstableTopology2() throws Exception {
+ putAllMissedDhtRequest_UnstableTopology(true, true);
+ }
+
+ /**
+ * @param fail0 Fail node 0 flag.
+ * @param fail1 Fail node 1 flag.
+ * @throws Exception If failed.
+ */
+ private void putAllMissedDhtRequest_UnstableTopology(boolean fail0, boolean fail1) throws Exception {
+ blockRebalance = true;
+
+ ccfg = cacheConfiguration(1, FULL_SYNC);
+
+ startServers(4);
+
+ client = true;
+
+ Ignite client = startGrid(4);
+
+ IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
+ IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
+
+ if (fail0) {
+ testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() {
+ @Override public boolean apply(GridIoMessage msg) {
+ return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+ }
+ });
+ }
+ if (fail1) {
+ testSpi(ignite(2)).blockMessages(new IgnitePredicate<GridIoMessage>() {
+ @Override public boolean apply(GridIoMessage msg) {
+ return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+ }
+ });
+ }
+
+ Integer key1 = primaryKey(ignite(0).cache(TEST_CACHE));
+ Integer key2 = primaryKey(ignite(2).cache(TEST_CACHE));
+
+ log.info("Start put [key1=" + key1 + ", key2=" + key1 + ']');
+
+ Map<Integer, Integer> map = new HashMap<>();
+ map.put(key1, 10);
+ map.put(key2, 20);
+
+ nearAsyncCache.putAll(map);
+ IgniteFuture<?> fut = nearAsyncCache.future();
+
+ U.sleep(500);
+
+ assertFalse(fut.isDone());
+
+ if (fail0)
+ stopGrid(0);
+ if (fail1)
+ stopGrid(2);
+
+ fut.get();
+
+ checkData(map);
+ }
+
+ /**
* @param expData Expected cache data.
*/
private void checkData(Map<Integer, Integer> expData) {