You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/12/24 11:58:09 UTC
[06/50] [abbrv] ignite git commit: ignite-1.5 Fixed client discovery
impl to skip node failed message processing while disconnected.
ignite-1.5 Fixed client discovery impl to skip node failed message processing while disconnected.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4687d9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4687d9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4687d9f
Branch: refs/heads/master
Commit: d4687d9f636b38736d327351ca4b22c3262a2ae8
Parents: 58b55b5
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 21 10:19:51 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 21 10:19:51 2015 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 4 +-
.../dht/preloader/GridDhtPreloader.java | 29 ------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 48 +++++++++++---------
.../IgniteClientReconnectCacheTest.java | 26 +++++++++--
.../cache/IgniteCachePutAllRestartTest.java | 2 +-
5 files changed, 53 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 92d66d7..72a2bef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1641,7 +1641,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (cache == null) {
throw new IgniteException("Failed to resolve nodes topology [cacheName=" + cacheName +
- ", topVer=" + topVer + ", history=" + discoCacheHist.keySet() +
+ ", topVer=" + topVer +
+ ", history=" + discoCacheHist.keySet() +
+ ", snap=" + snap +
", locNode=" + ctx.discovery().localNode() + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index c46a66c..f0054e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -48,7 +48,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -92,9 +91,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** */
private GridDhtPartitionTopology top;
- /** Topology version. */
- private final GridAtomicLong topVer = new GridAtomicLong();
-
/** Force key futures. */
private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
@@ -149,11 +145,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " +
"order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
- boolean set = topVer.setIfGreater(e.topologyVersion());
-
- assert set : "Have you configured TcpDiscoverySpi for your in-memory data grid? [newVer=" +
- e.topologyVersion() + ", curVer=" + topVer.get() + ", evt=" + e + ']';
-
if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())
fut.onNodeLeft(e.eventNode().id());
@@ -238,20 +229,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("DHT rebalancer onKernalStart callback.");
-
- ClusterNode loc = cctx.localNode();
-
- assert loc.metrics().getStartTime() > 0;
-
- final long startTopVer = loc.order();
-
- topVer.setIfGreater(startTopVer);
- }
-
- /** {@inheritDoc} */
@Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
super.preloadPredicate(preloadPred);
@@ -382,12 +359,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void onReconnected() {
startFut = new GridFutureAdapter<>();
-
- long topVer0 = cctx.kernalContext().discovery().topologyVersion();
-
- assert topVer0 > 0 : topVer0;
-
- topVer.set(topVer0);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 8f6c8a9..850cc24 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1828,36 +1828,42 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
- if (!getLocalNodeId().equals(msg.creatorNodeId())) {
- TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
+ if (nodeAdded()) {
+ if (!getLocalNodeId().equals(msg.creatorNodeId())) {
+ TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');
- return;
- }
+ return;
+ }
- Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
+ Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
- if (state != CONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Discarding node failed message (join process is not finished): " + msg);
+ if (state != CONNECTED) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node failed message (join process is not finished): " + msg);
- return;
- }
+ return;
+ }
- if (msg.warning() != null) {
- ClusterNode creatorNode = rmtNodes.get(msg.creatorNodeId());
+ if (msg.warning() != null) {
+ ClusterNode creatorNode = rmtNodes.get(msg.creatorNodeId());
- U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
- "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
- ", msg=" + msg.warning() + ']');
- }
+ U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
+ "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
+ ", msg=" + msg.warning() + ']');
+ }
- notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
+ notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
- spi.stats.onNodeFailed();
+ spi.stats.onNodeFailed();
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Ignore topology message, local node not added to topology: " + msg);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 5234d6e..ad6c46f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -1088,7 +1088,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
clientMode = true;
- final int CLIENTS = 2;
+ final int CLIENTS = 5;
List<Ignite> clients = new ArrayList<>();
@@ -1103,12 +1103,14 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
int nodes = SRV_CNT + CLIENTS;
int srvNodes = SRV_CNT;
- for (int iter = 0; iter < 3; iter++) {
+ for (int iter = 0; iter < 5; iter++) {
log.info("Iteration: " + iter);
reconnectClientNodes(log, clients, grid(0), null);
- for (Ignite client : clients) {
+ final int expNodes = CLIENTS + srvNodes;
+
+ for (final Ignite client : clients) {
IgniteCache<Object, Object> cache = client.cache(null);
assertNotNull(cache);
@@ -1117,6 +1119,14 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
assertEquals(1, cache.get(client.name()));
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ ClusterGroup grp = client.cluster().forCacheNodes(null);
+
+ return grp.nodes().size() == expNodes;
+ }
+ }, 5000);
+
ClusterGroup grp = client.cluster().forCacheNodes(null);
assertEquals(CLIENTS + srvNodes, grp.nodes().size());
@@ -1127,7 +1137,15 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
}
for (int i = 0; i < nodes; i++) {
- Ignite ignite = grid(i);
+ final Ignite ignite = grid(i);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ ClusterGroup grp = ignite.cluster().forCacheNodes(null);
+
+ return grp.nodes().size() == expNodes;
+ }
+ }, 5000);
ClusterGroup grp = ignite.cluster().forCacheNodes(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
index 3e124f3..96a396c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
@@ -121,7 +121,7 @@ public class IgniteCachePutAllRestartTest extends GridCommonAbstractTest {
iter++;
- if (iter % 10 == 0)
+ if (iter % 1000 == 0)
log.info("Iteration: " + iter);
}