You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/07/01 03:36:38 UTC
[15/50] incubator-ignite git commit: # ignite-883 issues with client
connect/reconnect
# ignite-883 issues with client connect/reconnect
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/efa92c54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/efa92c54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/efa92c54
Branch: refs/heads/ignite-973-2
Commit: efa92c54ab07d7f72b9c83aa3a09e03627d72e4a
Parents: 3e8ddb4
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 26 11:06:41 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 26 11:06:41 2015 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 6 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 151 ++++++-----
.../ignite/spi/discovery/tcp/ServerImpl.java | 103 +++++--
.../spi/discovery/tcp/TcpDiscoverySpi.java | 3 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 265 ++++++++++++++++++-
5 files changed, 448 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index edd0ad7..af87685 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -826,7 +826,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
updated |= top.update(null, entry.getValue()) != null;
}
- if (updated)
+ if (!cctx.kernalContext().clientNode() && updated)
refreshPartitions();
}
else
@@ -985,7 +985,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// If not first preloading and no more topology events present,
// then we periodically refresh partition map.
- if (futQ.isEmpty() && preloadFinished) {
+ if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) {
refreshPartitions(timeout);
timeout = cctx.gridConfig().getNetworkTimeout();
@@ -1051,7 +1051,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
startEvtFired = true;
- if (changed && futQ.isEmpty())
+ if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty())
refreshPartitions();
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 0c2c059..04276d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1123,7 +1123,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (joinLatch.getCount() > 0) {
joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
"join request (consider increasing 'joinTimeout' configuration property) " +
- "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock +']'));
+ "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
break;
}
@@ -1282,17 +1282,21 @@ class ClientImpl extends TcpDiscoveryImpl {
"[msg=" + msg + ", locNode=" + locNode + ']');
}
else {
- boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
+ if (nodeAdded()) {
+ boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
- if (topChanged) {
- if (log.isDebugEnabled())
- log.debug("Added new node to topology: " + node);
+ if (topChanged) {
+ if (log.isDebugEnabled())
+ log.debug("Added new node to topology: " + node);
- Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+ Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
- if (data != null)
- spi.onExchange(newNodeId, newNodeId, data, null);
+ if (data != null)
+ spi.onExchange(newNodeId, newNodeId, data, null);
+ }
}
+ else if (log.isDebugEnabled())
+ log.debug("Ignore topology message, local node not added to topology: " + msg);
}
}
@@ -1332,54 +1336,58 @@ class ClientImpl extends TcpDiscoveryImpl {
"[msg=" + msg + ", locNode=" + locNode + ']');
}
else {
- TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
+ if (nodeAdded()) {
+ TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');
- return;
- }
+ return;
+ }
- boolean evt = false;
+ boolean evt = false;
- long topVer = msg.topologyVersion();
+ long topVer = msg.topologyVersion();
- assert topVer > 0 : msg;
+ assert topVer > 0 : msg;
- if (!node.visible()) {
- node.order(topVer);
- node.visible(true);
+ if (!node.visible()) {
+ node.order(topVer);
+ node.visible(true);
- if (spi.locNodeVer.equals(node.version()))
- node.version(spi.locNodeVer);
+ if (spi.locNodeVer.equals(node.version()))
+ node.version(spi.locNodeVer);
- evt = true;
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Skip node join event, node already joined [msg=" + msg + ", node=" + node + ']');
+ evt = true;
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skip node join event, node already joined [msg=" + msg + ", node=" + node + ']');
- assert node.order() == topVer : node;
- }
+ assert node.order() == topVer : node;
+ }
- Collection<ClusterNode> top = updateTopologyHistory(topVer, msg);
+ Collection<ClusterNode> top = updateTopologyHistory(topVer, msg);
- assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg +
- ", node=" + node + ", top=" + top + ']';
+ assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg +
+ ", node=" + node + ", top=" + top + ']';
- if (!pending && joinLatch.getCount() > 0) {
- if (log.isDebugEnabled())
- log.debug("Discarding node add finished message (join process is not finished): " + msg);
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message (join process is not finished): " + msg);
- return;
- }
+ return;
+ }
- if (evt) {
- notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
+ if (evt) {
+ notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
- spi.stats.onNodeJoined();
+ spi.stats.onNodeJoined();
+ }
}
+ else if (log.isDebugEnabled())
+ log.debug("Ignore topology message, local node not added to topology: " + msg);
}
}
@@ -1397,31 +1405,42 @@ class ClientImpl extends TcpDiscoveryImpl {
if (spi.getSpiContext().isStopping())
return;
- TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
+ if (nodeAdded()) {
+ TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Discarding node left message since node is not found [msg=" + msg + ']');
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node left message since node is not found [msg=" + msg + ']');
- return;
- }
+ return;
+ }
- Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
+ Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg);
- if (!pending && joinLatch.getCount() > 0) {
- if (log.isDebugEnabled())
- log.debug("Discarding node left message (join process is not finished): " + msg);
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node left message (join process is not finished): " + msg);
- return;
- }
+ return;
+ }
- notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
+ notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
- spi.stats.onNodeLeft();
+ spi.stats.onNodeLeft();
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Ignore topology message, local node not added to topology: " + msg);
}
}
/**
+ * @return {@code True} if received node added message for local node.
+ */
+ private boolean nodeAdded() {
+ return !topHist.isEmpty();
+ }
+
+ /**
* @param msg Message.
*/
private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
@@ -1514,9 +1533,9 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
if (getLocalNodeId().equals(msg.creatorNodeId())) {
- assert msg.success() : msg;
-
if (reconnector != null) {
+ assert msg.success() : msg;
+
currSock = reconnector.sock;
sockWriter.setSocket(currSock);
@@ -1529,7 +1548,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
if (log.isDebugEnabled())
- log.debug("Process message on reconnect [msg=" + pendingMsg + ']');
+ log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']');
processDiscoveryMessage(pendingMsg);
}
@@ -1538,8 +1557,22 @@ class ClientImpl extends TcpDiscoveryImpl {
pending = false;
}
}
- else if (log.isDebugEnabled())
- log.debug("Discarding reconnect message, reconnect is completed: " + msg);
+ else {
+ if (joinLatch.getCount() > 0) {
+ if (msg.success()) {
+ for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) {
+ if (log.isDebugEnabled())
+ log.debug("Process pending message on connect [msg=" + pendingMsg + ']');
+
+ processDiscoveryMessage(pendingMsg);
+ }
+
+ assert joinLatch.getCount() == 0 : msg;
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding reconnect message, reconnect is completed: " + msg);
+ }
}
else if (log.isDebugEnabled())
log.debug("Discarding reconnect message for another client: " + msg);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2458f85..fa3e564 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2452,7 +2452,40 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- if (log.isDebugEnabled())
+ if (msg.client()) {
+ TcpDiscoveryClientReconnectMessage reconMsg = new TcpDiscoveryClientReconnectMessage(node.id(),
+ node.clientRouterNodeId(),
+ null);
+
+ reconMsg.verify(getLocalNodeId());
+
+ Collection<TcpDiscoveryAbstractMessage> msgs = msgHist.messages(null, node);
+
+ if (msgs != null) {
+ reconMsg.pendingMessages(msgs);
+
+ reconMsg.success(true);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Send reconnect message to already joined client " +
+ "[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
+
+ if (getLocalNodeId().equals(node.clientRouterNodeId())) {
+ ClientMessageWorker wrk = clientMsgWorkers.get(node.id());
+
+ if (wrk != null)
+ wrk.addMessage(reconMsg);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to find client message worker " +
+ "[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
+ }
+ else {
+ if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(reconMsg);
+ }
+ }
+ else if (log.isDebugEnabled())
log.debug("Ignoring join request message since node is already in topology: " + msg);
return;
@@ -4104,15 +4137,44 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (req.client()) {
+ ClientMessageWorker clientMsgWrk0 = new ClientMessageWorker(sock, nodeId);
+
+ while (true) {
+ ClientMessageWorker old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
+
+ if (old == null)
+ break;
+
+ if (old.isInterrupted()) {
+ clientMsgWorkers.remove(nodeId, old);
+
+ continue;
+ }
+
+ old.join(500);
+
+ old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
+
+ if (old == null)
+ break;
+
+ if (log.isDebugEnabled())
+ log.debug("Already have client message worker, closing connection " +
+ "[locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId +
+ ", workerSock=" + old.sock +
+ ", sock=" + sock + ']');
+
+ return;
+ }
+
if (log.isDebugEnabled())
log.debug("Created client message worker [locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
- clientMsgWrk = new ClientMessageWorker(sock, nodeId);
-
- clientMsgWrk.start();
+ assert clientMsgWrk0 == clientMsgWorkers.get(nodeId);
- clientMsgWorkers.put(nodeId, clientMsgWrk);
+ clientMsgWrk = clientMsgWrk0;
}
if (log.isDebugEnabled())
@@ -4188,7 +4250,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
if (!req.responded()) {
- boolean ok = processJoinRequestMessage(req);
+ boolean ok = processJoinRequestMessage(req, clientMsgWrk);
if (clientMsgWrk != null && ok)
continue;
@@ -4202,14 +4264,17 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoverySpiState state = spiStateCopy();
if (state == CONNECTED) {
- spi.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK);
+
+ if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+ clientMsgWrk.start();
msgWorker.addMessage(msg);
continue;
}
else {
- spi.writeToSocket(sock, RES_CONTINUE_JOIN);
+ spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN);
break;
}
@@ -4217,7 +4282,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
// Send receipt back.
- spi.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK);
boolean ignored = false;
@@ -4246,7 +4311,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
// Send receipt back.
- spi.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK);
boolean ignored = false;
@@ -4275,7 +4340,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
// Send receipt back.
- spi.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK);
boolean ignored = false;
@@ -4304,7 +4369,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
// Send receipt back.
- spi.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK);
boolean ignored = false;
@@ -4346,7 +4411,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Send receipt back.
if (clientMsgWrk == null)
- spi.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -4435,24 +4500,29 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* @param msg Join request message.
+ * @param clientMsgWrk Client message worker to start.
* @return Whether connection was successful.
* @throws IOException If IO failed.
*/
@SuppressWarnings({"IfMayBeConditional"})
- private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg) throws IOException {
+ private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg,
+ @Nullable ClientMessageWorker clientMsgWrk) throws IOException {
assert msg != null;
assert !msg.responded();
TcpDiscoverySpiState state = spiStateCopy();
if (state == CONNECTED) {
- spi.writeToSocket(sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
msg.responded(true);
+ if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+ clientMsgWrk.start();
+
msgWorker.addMessage(msg);
return true;
@@ -4477,7 +4547,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Local node is stopping. Remote node should try next one.
res = RES_CONTINUE_JOIN;
- spi.writeToSocket(sock, res);
+ spi.writeToSocket(msg, sock, res);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
@@ -4632,6 +4702,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * @return Ping result.
* @throws InterruptedException If interrupted.
*/
public boolean ping() throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 1d1916a..7663fe6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1227,12 +1227,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/**
* Writes response to the socket.
*
+ * @param msg Received message.
* @param sock Socket.
* @param res Integer response.
* @throws IOException If IO failed or write timed out.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
- protected void writeToSocket(Socket sock, int res) throws IOException {
+ protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
assert sock != null;
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 8147958..ec6a526 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -24,7 +24,9 @@ import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
@@ -106,11 +108,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
private int maxMissedClientHbs = TcpDiscoverySpi.DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
+ /** */
+ private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+ TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
disco.setMaxMissedClientHeartbeats(maxMissedClientHbs);
@@ -154,6 +159,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
disco.setJoinTimeout(joinTimeout);
disco.setNetworkTimeout(netTimeout);
+ disco.afterWrite(afterWrite);
+
cfg.setDiscoverySpi(disco);
if (nodeId != null)
@@ -1016,6 +1023,189 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testJoinError3() throws Exception {
+ startServerNodes(1);
+
+ Ignite ignite = G.ignite("server-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+
+ srvSpi.failNodeAddFinishedMessage();
+
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinErrorMissedAddFinishedMessage1() throws Exception {
+ missedAddFinishedMessage(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinErrorMissedAddFinishedMessage2() throws Exception {
+ missedAddFinishedMessage(false);
+ }
+
+ /**
+ * @param singleSrv If {@code true} starts one server node two otherwise.
+ * @throws Exception If failed.
+ */
+ private void missedAddFinishedMessage(boolean singleSrv) throws Exception {
+ int srvs = singleSrv ? 1 : 2;
+
+ startServerNodes(srvs);
+
+ afterWrite = new CIX2<TcpDiscoveryAbstractMessage, Socket>() {
+ private boolean first = true;
+
+ @Override public void applyx(TcpDiscoveryAbstractMessage msg, Socket sock) throws IgniteCheckedException {
+ if (first && (msg instanceof TcpDiscoveryJoinRequestMessage)) {
+ first = false;
+
+ log.info("Close socket after message write [msg=" + msg + "]");
+
+ try {
+ sock.close();
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ log.info("Delay after message write [msg=" + msg + "]");
+
+ U.sleep(5000); // Wait when server process join request.
+ }
+ }
+ };
+
+ Ignite srv = singleSrv ? G.ignite("server-0") : G.ignite("server-1");
+
+ TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode();
+
+ assertEquals(singleSrv ? 1 : 2, srvNode.order());
+
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+ clientIpFinder.setAddresses(Collections.singleton("localhost:" + srvNode.discoveryPort()));
+
+ startClientNodes(1);
+
+ TcpDiscoveryNode clientNode = (TcpDiscoveryNode)G.ignite("client-0").cluster().localNode();
+
+ assertEquals(srvNode.id(), clientNode.clientRouterNodeId());
+
+ checkNodes(srvs, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientMessageWorkerStartSingleServer() throws Exception {
+ clientMessageWorkerStart(1, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientMessageWorkerStartTwoServers1() throws Exception {
+ clientMessageWorkerStart(2, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientMessageWorkerStartTwoServers2() throws Exception {
+ clientMessageWorkerStart(2, 2);
+ }
+
+ /**
+ * @param srvs Number of server nodes.
+ * @param connectTo What server connect to.
+ * @throws Exception If failed.
+ */
+ private void clientMessageWorkerStart(int srvs, int connectTo) throws Exception {
+ startServerNodes(srvs);
+
+ Ignite srv = G.ignite("server-" + (connectTo - 1));
+
+ final TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode();
+
+ assertEquals((long)connectTo, srvNode.order());
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+ final String client0 = "client-" + clientIdx.getAndIncrement();
+
+ srvSpi.delayJoinAckFor = client0;
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+ clientIpFinder.setAddresses(Collections.singleton("localhost:" + srvNode.discoveryPort()));
+
+ Ignite client = startGrid(client0);
+
+ clientIpFinder = null;
+
+ clientNodeIds.add(client.cluster().localNode().id());
+
+ TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+ assertFalse(clientSpi.invalidResponse());
+
+ TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+
+ assertEquals(srvNode.id(), clientNode.clientRouterNodeId());
+
+ return null;
+ }
+ });
+
+ final String client1 = "client-" + clientIdx.getAndIncrement();
+
+ while (!fut.isDone()) {
+ startGrid(client1);
+
+ stopGrid(client1);
+ }
+
+ fut.get();
+
+ checkNodes(srvs, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinMutlithreaded() throws Exception {
+ startServerNodes(1);
+
+ final int CLIENTS = 30;
+
+ clientsPerSrv = CLIENTS;
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
+
+ clientNodeIds.add(g.cluster().localNode().id());
+
+ return null;
+ }
+ }, CLIENTS, "start-client");
+
+ checkNodes(1, CLIENTS);
+ }
+
+ /**
* @param clientIdx Client index.
* @param srvIdx Server index.
* @throws Exception In case of error.
@@ -1267,8 +1457,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
private AtomicInteger failNodeAdded = new AtomicInteger();
/** */
+ private AtomicInteger failNodeAddFinished = new AtomicInteger();
+
+ /** */
private AtomicInteger failClientReconnect = new AtomicInteger();
+ /** */
+ private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
+
+ /** */
+ private volatile boolean invalidRes;
+
+ /** */
+ private volatile String delayJoinAckFor;
+
/**
* @param lock Lock.
*/
@@ -1287,6 +1489,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * @param afterWrite After write callback.
+ */
+ void afterWrite(IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite) {
+ this.afterWrite = afterWrite;
+ }
+
+ /**
+ * @return {@code True} if received unexpected ack.
+ */
+ boolean invalidResponse() {
+ return invalidRes;
+ }
+
+ /**
*
*/
void failNodeAddedMessage() {
@@ -1296,6 +1512,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/**
*
*/
+ void failNodeAddFinishedMessage() {
+ failNodeAddFinished.set(1);
+ }
+
+ /**
+ *
+ */
void failClientReconnectMessage() {
failClientReconnect.set(1);
}
@@ -1322,6 +1545,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
if (msg instanceof TcpDiscoveryNodeAddedMessage)
fail = failNodeAdded.getAndDecrement() > 0;
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+ fail = failNodeAddFinished.getAndDecrement() > 0;
else if (msg instanceof TcpDiscoveryClientReconnectMessage)
fail = failClientReconnect.getAndDecrement() > 0;
@@ -1332,6 +1557,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
super.writeToSocket(sock, msg, bout);
+
+ if (afterWrite != null)
+ afterWrite.apply(msg, sock);
}
/** {@inheritDoc} */
@@ -1365,5 +1593,40 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
impl.workerThread().resume();
}
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+ if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) {
+ TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
+
+ if (delayJoinAckFor.equals(msg0.node().attribute(IgniteNodeAttributes.ATTR_GRID_NAME))) {
+ log.info("Delay response [sock=" + sock + ", msg=" + msg0 + ", res=" + res + ']');
+
+ delayJoinAckFor = null;
+
+ try {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ super.writeToSocket(msg, sock, res);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readReceipt(Socket sock, long timeout) throws IOException {
+ int res = super.readReceipt(sock, timeout);
+
+ if (res != TcpDiscoveryImpl.RES_OK) {
+ invalidRes = true;
+
+ log.info("Received unexpected response: " + res);
+ }
+
+ return res;
+ }
}
}