You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/08/15 03:49:08 UTC
[21/35] incubator-ignite git commit: # ignite-1229: stop ping process
when node left topology
# ignite-1229: stop ping process when node left topology
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d5986c26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d5986c26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d5986c26
Branch: refs/heads/ignite-264
Commit: d5986c265c9f68c2a98c48d4ba75444fad9e6725
Parents: ae11e9b
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 12 11:42:22 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 12 11:42:22 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 91 +++++++++++++-------
1 file changed, 60 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d5986c26/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 92c21ed..76144e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -388,12 +388,15 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryNode node = ring.node(nodeId);
- if (node == null || !node.visible())
+ if (node == null)
+ return false;
+
+ if (!nodeAlive(nodeId))
return false;
boolean res = pingNode(node);
- if (!res && !node.isClient()) {
+ if (!res && !node.isClient() && nodeAlive(nodeId)) {
LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId);
msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id()));
@@ -421,14 +424,18 @@ class ServerImpl extends TcpDiscoveryImpl {
node = ring.node(node.clientRouterNodeId());
- if (node == null || !node.visible())
+ if (node == null || !nodeAlive(node.id()))
return false;
}
for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) {
try {
// ID returned by the node should be the same as ID of the parameter for ping to succeed.
- IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
+ IgniteBiTuple<UUID, Boolean> t = pingNode(addr, node.id(), clientNodeId);
+
+ if (t == null)
+ // Remote node left topology.
+ return false;
boolean res = node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
@@ -453,12 +460,14 @@ class ServerImpl extends TcpDiscoveryImpl {
* Pings the node by its address to see if it's alive.
*
* @param addr Address of the node.
+ * @param nodeId Node ID to ping. In case when client node ID is not null this node ID is an ID of the router node.
* @param clientNodeId Client node ID.
- * @return ID of the remote node and "client exists" flag if node alive.
+ * @return ID of the remote node and "client exists" flag if node alive or {@code null} if the remote node has
+ * left a topology during the ping process.
* @throws IgniteCheckedException If an error occurs.
*/
- private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId)
- throws IgniteCheckedException {
+ private @Nullable IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID nodeId,
+ @Nullable UUID clientNodeId) throws IgniteCheckedException {
assert addr != null;
UUID locNodeId = getLocalNodeId();
@@ -537,6 +546,16 @@ class ServerImpl extends TcpDiscoveryImpl {
return t;
}
catch (IOException | IgniteCheckedException e) {
+ if (nodeId != null && !nodeAlive(nodeId)) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to ping the node (has left or leaving topology): [nodeId=" + nodeId +
+ ']');
+
+ fut.onDone((IgniteBiTuple<UUID, Boolean>)null);
+
+ return null;
+ }
+
if (errs == null)
errs = new ArrayList<>();
@@ -615,6 +634,28 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Checks whether a node is alive or not.
+ *
+ * @param nodeId Node ID.
+ * @return {@code True} if node is in the ring and is not being removed from.
+ */
+ private boolean nodeAlive(UUID nodeId) {
+ // Is node alive or about to be removed from the ring?
+ TcpDiscoveryNode node = ring.node(nodeId);
+
+ boolean nodeAlive = node != null && node.visible();
+
+ if (nodeAlive) {
+ synchronized (mux) {
+ nodeAlive = !F.transform(failedNodes, F.node2id()).contains(nodeId) &&
+ !F.transform(leavingNodes, F.node2id()).contains(nodeId);
+ }
+ }
+
+ return nodeAlive;
+ }
+
+ /**
* Tries to join this node to topology.
*
* @throws IgniteSpiException If any error occurs.
@@ -1520,7 +1561,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (res == null) {
try {
- res = pingNode(addr, null).get1() != null;
+ res = pingNode(addr, null, null).get1() != null;
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -3775,9 +3816,17 @@ class ServerImpl extends TcpDiscoveryImpl {
else {
int aliveCheck = clientNode.decrementAliveCheck();
- if (aliveCheck <= 0 && isLocalNodeCoordinator() && !failedNodes.contains(clientNode))
- processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
- clientNode.id(), clientNode.internalOrder()));
+ if (aliveCheck <= 0 && isLocalNodeCoordinator()) {
+ boolean failedNode;
+
+ synchronized (mux) {
+ failedNode = failedNodes.contains(clientNode);
+ }
+
+ if (!failedNode)
+ processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
+ clientNode.id(), clientNode.internalOrder()));
+ }
}
}
}
@@ -4689,26 +4738,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * @param nodeId Node ID.
- * @return {@code True} if node is in the ring and is not being removed from.
- */
- private boolean nodeAlive(UUID nodeId) {
- // Is node alive or about to be removed from the ring?
- TcpDiscoveryNode node = ring.node(nodeId);
-
- boolean nodeAlive = node != null && node.visible();
-
- if (nodeAlive) {
- synchronized (mux) {
- nodeAlive = !F.transform(failedNodes, F.node2id()).contains(nodeId) &&
- !F.transform(leavingNodes, F.node2id()).contains(nodeId);
- }
- }
-
- return nodeAlive;
- }
-
- /**
* @param msg Join request message.
* @param clientMsgWrk Client message worker to start.
* @return Whether connection was successful.