You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/22 14:38:14 UTC
ignite git commit: ignite-1524 Fixed processing of
ClientReconnectMessage
Repository: ignite
Updated Branches:
refs/heads/ignite-1524 [created] 919749cde
ignite-1524 Fixed processing of ClientReconnectMessage
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/919749cd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/919749cd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/919749cd
Branch: refs/heads/ignite-1524
Commit: 919749cde3e0c7e01c1a3b160807865a383ed634
Parents: 72c3eef
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 15:32:55 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 15:32:55 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 97 +++++++++++---------
...lientDiscoverySpiFailureTimeoutSelfTest.java | 33 +++++--
2 files changed, 81 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/919749cd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 4ce46e8..8a205d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2958,70 +2958,81 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Client reconnect message.
*/
private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+ UUID nodeId = msg.creatorNodeId();
+
UUID locNodeId = getLocalNodeId();
boolean isLocNodeRouter = locNodeId.equals(msg.routerNodeId());
if (!msg.verified()) {
- assert isLocNodeRouter;
-
- msg.verify(locNodeId);
+ TcpDiscoveryNode node = ring.node(nodeId);
- if (ring.hasRemoteNodes()) {
- sendMessageAcrossRing(msg);
+ assert node == null || node.isClient();
- return;
+ if (node != null) {
+ node.clientRouterNodeId(msg.routerNodeId());
+ node.aliveCheck(spi.maxMissedClientHbs);
}
- }
-
- UUID nodeId = msg.creatorNodeId();
- TcpDiscoveryNode node = ring.node(nodeId);
+ if (isLocalNodeCoordinator()) {
+ msg.verify(locNodeId);
- assert node == null || node.isClient();
+ if (node != null) {
+ Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
- if (node != null) {
- assert node.isClient();
+ if (pending != null) {
+ msg.pendingMessages(pending);
+ msg.success(true);
- node.clientRouterNodeId(msg.routerNodeId());
- node.aliveCheck(spi.maxMissedClientHbs);
+ if (log.isDebugEnabled())
+ log.debug("Accept client reconnect, restored pending messages " +
+ "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Failing reconnecting client node because failed to restore pending " +
+ "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
- if (isLocalNodeCoordinator()) {
- Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
+ processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
+ node.id(), node.internalOrder()));
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
- if (pending != null) {
- msg.pendingMessages(pending);
- msg.success(true);
+ if (isLocNodeRouter) {
+ ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
- if (log.isDebugEnabled())
- log.debug("Accept client reconnect, restored pending messages " +
- "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+ if (wrk != null)
+ wrk.addMessage(msg);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+ locNodeId + ", clientNodeId=" + nodeId + ']');
}
else {
- if (log.isDebugEnabled())
- log.debug("Failing reconnecting client node because failed to restore pending " +
- "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
-
- processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId,
- node.id(), node.internalOrder()));
+ if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
}
}
- }
- else if (log.isDebugEnabled())
- log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
-
- if (isLocNodeRouter) {
- ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
- if (wrk != null)
- wrk.addMessage(msg);
- else if (log.isDebugEnabled())
- log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
- locNodeId + ", clientNodeId=" + nodeId + ']');
+ else {
+ if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
+ }
}
else {
- if (ring.hasRemoteNodes())
- sendMessageAcrossRing(msg);
+ if (isLocNodeRouter) {
+ ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+ if (wrk != null)
+ wrk.addMessage(msg);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+ locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else {
+ if (ring.hasRemoteNodes() && !locNodeId.equals(msg.verifierNodeId()))
+ sendMessageAcrossRing(msg);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/919749cd/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 14417c1..344efc0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -170,11 +172,26 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
}
/**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnCoordinatorRouterFail1() throws Exception {
+ clientReconnectOnCoordinatorRouterFail(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnCoordinatorRouterFail2() throws Exception {
+ clientReconnectOnCoordinatorRouterFail(2);
+ }
+
+ /**
* Test tries to provoke scenario when client sends reconnect message before router failure detected.
*
+ * @param srvNodes Number of additional server nodes.
* @throws Exception If failed.
*/
- public void _testClientReconnectOnCoordinatorRouterFail() throws Exception {
+ public void clientReconnectOnCoordinatorRouterFail(int srvNodes) throws Exception {
startServerNodes(1);
Ignite srv = G.ignite("server-0");
@@ -189,24 +206,28 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
Collections.singleton("localhost:" + srvNode.discoveryPort() + ".." + (srvNode.discoveryPort() + 1)));
failureThreshold = 1000L;
- netTimeout = 500L;
+ netTimeout = 1000L;
startClientNodes(1); // Client should connect to coordinator.
failureThreshold = 10_000L;
netTimeout = 5000L;
- for (int i = 0; i < 2; i++) {
+ List<String> nodes = new ArrayList<>();
+
+ for (int i = 0; i < srvNodes; i++) {
Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+ nodes.add(g.name());
+
srvNodeIds.add(g.cluster().localNode().id());
}
- checkNodes(3, 1);
+ checkNodes(1 + srvNodes, 1);
- final CountDownLatch latch = new CountDownLatch(3);
+ nodes.add("client-0");
- String nodes[] = {"server-1", "server-2", "client-0"};
+ final CountDownLatch latch = new CountDownLatch(nodes.size());
final AtomicBoolean err = new AtomicBoolean();