You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/10/30 12:27:08 UTC
[14/23] ignite git commit: ignite-5860 Try process
TcpDiscoveryClientReconnectMessage from socket reader instead of always
processing it on coordinator.
ignite-5860 Try process TcpDiscoveryClientReconnectMessage from socket reader instead of always processing it on coordinator.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56a63f80
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56a63f80
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56a63f80
Branch: refs/heads/ignite-6748
Commit: 56a63f80d1181e53a3e2a4c4f88e42226bbac86e
Parents: 717c549
Author: Denis Mekhanikov <dm...@gmail.com>
Authored: Fri Oct 27 14:12:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 27 14:13:40 2017 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 52 +++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 311 ++++++++++---------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +-
...lientDiscoverySpiFailureTimeoutSelfTest.java | 20 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 275 +++++++++++++++-
5 files changed, 467 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 5dbfe6e..139c110 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -26,6 +26,7 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -470,7 +471,8 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
- * @param recon {@code True} if reconnects.
+ * @param prevAddr If reconnect is in progress, then previous address of the router the client was connected to
+ * and {@code null} otherwise.
* @param timeout Timeout.
* @return Opened socket or {@code null} if timeout.
* @throws InterruptedException If interrupted.
@@ -478,9 +480,9 @@ class ClientImpl extends TcpDiscoveryImpl {
* @see TcpDiscoverySpi#joinTimeout
*/
@SuppressWarnings("BusyWait")
- @Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout)
+ @Nullable private T2<SocketStream, Boolean> joinTopology(InetSocketAddress prevAddr, long timeout)
throws IgniteSpiException, InterruptedException {
- Collection<InetSocketAddress> addrs = null;
+ List<InetSocketAddress> addrs = null;
long startTime = U.currentTimeMillis();
@@ -489,7 +491,7 @@ class ClientImpl extends TcpDiscoveryImpl {
throw new InterruptedException();
while (addrs == null || addrs.isEmpty()) {
- addrs = spi.resolvedAddresses();
+ addrs = new ArrayList<>(spi.resolvedAddresses());
if (!F.isEmpty(addrs)) {
if (log.isDebugEnabled())
@@ -509,22 +511,30 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
- Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
+ // Process failed node last.
+ if (prevAddr != null) {
+ int idx = addrs.indexOf(prevAddr);
- Iterator<InetSocketAddress> it = addrs.iterator();
+ if (idx != -1)
+ Collections.swap(addrs, idx, 0);
+ }
+
+ Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
boolean wait = false;
- while (it.hasNext()) {
+ for (int i = addrs.size() - 1; i >= 0; i--) {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
- InetSocketAddress addr = it.next();
+ InetSocketAddress addr = addrs.get(i);
+
+ boolean recon = prevAddr != null;
T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
if (sockAndRes == null) {
- it.remove();
+ addrs.remove(i);
continue;
}
@@ -852,8 +862,8 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override protected IgniteSpiThread workerThread() {
- return msgWorker;
+ @Override protected Collection<IgniteSpiThread> threads() {
+ return Arrays.asList(sockWriter, msgWorker);
}
/**
@@ -1336,15 +1346,20 @@ class ClientImpl extends TcpDiscoveryImpl {
private boolean clientAck;
/** */
- private boolean join;
+ private final boolean join;
+
+ /** */
+ private final InetSocketAddress prevAddr;
/**
* @param join {@code True} if reconnects during join.
+ * @param prevAddr Address of the node, that this client was previously connected to.
*/
- protected Reconnector(boolean join) {
+ protected Reconnector(boolean join, InetSocketAddress prevAddr) {
super(spi.ignite().name(), "tcp-client-disco-reconnector", log);
this.join = join;
+ this.prevAddr = prevAddr;
}
/**
@@ -1374,7 +1389,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
while (true) {
- T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout);
+ T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout);
if (joinRes == null) {
if (join) {
@@ -1609,6 +1624,10 @@ class ClientImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof SocketClosedMessage) {
if (((SocketClosedMessage)msg).sock == currSock) {
+ Socket sock = currSock.sock;
+
+ InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort());
+
currSock = null;
boolean join = joinLatch.getCount() > 0;
@@ -1637,8 +1656,7 @@ class ClientImpl extends TcpDiscoveryImpl {
assert reconnector == null;
- final Reconnector reconnector = new Reconnector(join);
- this.reconnector = reconnector;
+ reconnector = new Reconnector(join, prevAddr);
reconnector.start();
}
}
@@ -1811,7 +1829,7 @@ class ClientImpl extends TcpDiscoveryImpl {
T2<SocketStream, Boolean> joinRes;
try {
- joinRes = joinTopology(false, spi.joinTimeout);
+ joinRes = joinTopology(null, spi.joinTimeout);
}
catch (IgniteSpiException e) {
joinError(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index efe531a..1c3ec2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -219,6 +219,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
+ /** Messages history used for client reconnect. */
+ private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
+
/** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
private boolean ipFinderHasLocAddr;
@@ -1663,8 +1666,23 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override protected IgniteSpiThread workerThread() {
- return msgWorker;
+ @Override protected Collection<IgniteSpiThread> threads() {
+ Collection<IgniteSpiThread> threads;
+
+ synchronized (mux) {
+ threads = new ArrayList<>(readers.size() + clientMsgWorkers.size() + 4);
+ threads.addAll(readers);
+ }
+
+ threads.addAll(clientMsgWorkers.values());
+ threads.add(tcpSrvr);
+ threads.add(ipFinderCleaner);
+ threads.add(msgWorker);
+ threads.add(statsPrinter);
+
+ threads.removeAll(Collections.<IgniteSpiThread>singleton(null));
+
+ return threads;
}
/**
@@ -2122,7 +2140,9 @@ class ServerImpl extends TcpDiscoveryImpl {
else if (msg instanceof TcpDiscoveryNodeFailedMessage)
clearClientAddFinished(((TcpDiscoveryNodeFailedMessage)msg).failedNodeId());
- msgs.add(msg);
+ synchronized (msgs) {
+ msgs.add(msg);
+ }
}
/**
@@ -2161,14 +2181,16 @@ class ServerImpl extends TcpDiscoveryImpl {
// Client connection failed before it received TcpDiscoveryNodeAddedMessage.
List<TcpDiscoveryAbstractMessage> res = null;
- for (TcpDiscoveryAbstractMessage msg : msgs) {
- if (msg instanceof TcpDiscoveryNodeAddedMessage) {
- if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
- res = new ArrayList<>(msgs.size());
- }
+ synchronized (msgs) {
+ for (TcpDiscoveryAbstractMessage msg : msgs) {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
+ res = new ArrayList<>(msgs.size());
+ }
- if (res != null)
- res.add(prepare(msg, node.id()));
+ if (res != null)
+ res.add(prepare(msg, node.id()));
+ }
}
if (log.isDebugEnabled()) {
@@ -2181,20 +2203,26 @@ class ServerImpl extends TcpDiscoveryImpl {
return res;
}
else {
- if (msgs.isEmpty())
- return Collections.emptyList();
+ Collection<TcpDiscoveryAbstractMessage> cp;
- Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size());
+ boolean skip;
- boolean skip = true;
+ synchronized (msgs) {
+ if (msgs.isEmpty())
+ return Collections.emptyList();
- for (TcpDiscoveryAbstractMessage msg : msgs) {
- if (skip) {
- if (msg.id().equals(lastMsgId))
- skip = false;
+ cp = new ArrayList<>(msgs.size());
+
+ skip = true;
+
+ for (TcpDiscoveryAbstractMessage msg : msgs) {
+ if (skip) {
+ if (msg.id().equals(lastMsgId))
+ skip = false;
+ }
+ else
+ cp.add(prepare(msg, node.id()));
}
- else
- cp.add(prepare(msg, node.id()));
}
cp = !skip ? cp : null;
@@ -2483,9 +2511,6 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Pending messages. */
private final PendingMessages pendingMsgs = new PendingMessages();
- /** Messages history used for client reconnect. */
- private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
-
/** Last message that updated topology. */
private TcpDiscoveryAbstractMessage lastMsg;
@@ -2659,8 +2684,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg instanceof TcpDiscoveryJoinRequestMessage)
processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
- else if (msg instanceof TcpDiscoveryClientReconnectMessage)
- processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+ else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+ if (sendMessageToRemotes(msg))
+ sendMessageAcrossRing(msg);
+ }
else if (msg instanceof TcpDiscoveryNodeAddedMessage)
processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
@@ -2695,9 +2722,6 @@ class ServerImpl extends TcpDiscoveryImpl {
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
- if (ensured && redirectToClients(msg))
- msgHist.add(msg);
-
if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
// Received a message from remote node.
onMessageExchanged();
@@ -2730,6 +2754,9 @@ class ServerImpl extends TcpDiscoveryImpl {
*/
private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
if (redirectToClients(msg)) {
+ if (spi.ensured(msg))
+ msgHist.add(msg);
+
byte[] msgBytes = null;
for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
@@ -3836,9 +3863,6 @@ class ServerImpl extends TcpDiscoveryImpl {
nodeAddedMsg.client(msg.client());
processNodeAddedMessage(nodeAddedMsg);
-
- if (nodeAddedMsg.verified())
- msgHist.add(nodeAddedMsg);
}
else if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
@@ -3941,98 +3965,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Processes client reconnect message.
- *
- * @param msg Client reconnect message.
- */
- private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
- UUID nodeId = msg.creatorNodeId();
-
- UUID locNodeId = getLocalNodeId();
-
- boolean isLocNodeRouter = locNodeId.equals(msg.routerNodeId());
-
- if (!msg.verified()) {
- TcpDiscoveryNode node = ring.node(nodeId);
-
- assert node == null || node.isClient();
-
- if (node != null) {
- node.clientRouterNodeId(msg.routerNodeId());
- node.clientAliveTime(spi.clientFailureDetectionTimeout());
- }
-
- if (isLocalNodeCoordinator()) {
- msg.verify(locNodeId);
-
- if (node != null) {
- Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
-
- if (pending != null) {
- msg.pendingMessages(pending);
- msg.success(true);
-
- if (log.isDebugEnabled())
- log.debug("Accept client reconnect, restored pending messages " +
- "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Failing reconnecting client node because failed to restore pending " +
- "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
-
- TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId,
- node.id(), node.internalOrder());
-
- processNodeFailedMessage(nodeFailedMsg);
-
- if (nodeFailedMsg.verified())
- msgHist.add(nodeFailedMsg);
- }
- }
- else if (log.isDebugEnabled())
- log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
-
- if (isLocNodeRouter) {
- ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
- if (wrk != null)
- wrk.addMessage(msg);
- else if (log.isDebugEnabled())
- log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
- locNodeId + ", clientNodeId=" + nodeId + ']');
- }
- else {
- if (sendMessageToRemotes(msg))
- sendMessageAcrossRing(msg);
- }
- }
- else {
- if (sendMessageToRemotes(msg))
- sendMessageAcrossRing(msg);
- }
- }
- else {
- if (isLocalNodeCoordinator())
- addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
-
- if (isLocNodeRouter) {
- ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
- if (wrk != null)
- wrk.addMessage(msg);
- else if (log.isDebugEnabled())
- log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
- locNodeId + ", clientNodeId=" + nodeId + ']');
- }
- else {
- if (ring.hasRemoteNodes() && !isLocalNodeCoordinator())
- sendMessageAcrossRing(msg);
- }
- }
- }
-
- /**
* Processes node added message.
*
* For coordinator node method marks the messages as verified for rest of nodes to apply the
@@ -4078,9 +4010,6 @@ class ServerImpl extends TcpDiscoveryImpl {
processNodeAddFinishedMessage(addFinishMsg);
- if (addFinishMsg.verified())
- msgHist.add(addFinishMsg);
-
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
return;
@@ -5145,9 +5074,6 @@ class ServerImpl extends TcpDiscoveryImpl {
locNodeId, clientNode.id(), clientNode.internalOrder());
processNodeFailedMessage(nodeFailedMsg);
-
- if (nodeFailedMsg.verified())
- msgHist.add(nodeFailedMsg);
}
}
}
@@ -5342,9 +5268,6 @@ class ServerImpl extends TcpDiscoveryImpl {
ackMsg.topologyVersion(msg.topologyVersion());
processCustomMessage(ackMsg);
-
- if (ackMsg.verified())
- msgHist.add(ackMsg);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal discovery custom message.", e);
@@ -5446,12 +5369,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (joiningEmpty && isLocalNodeCoordinator()) {
TcpDiscoveryCustomEventMessage msg;
- while ((msg = pollPendingCustomeMessage()) != null) {
+ while ((msg = pollPendingCustomeMessage()) != null)
processCustomMessage(msg);
-
- if (msg.verified())
- msgHist.add(msg);
- }
}
}
@@ -6005,24 +5924,22 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
- if (clientMsgWrk != null) {
- TcpDiscoverySpiState state = spiStateCopy();
+ TcpDiscoverySpiState state = spiStateCopy();
- if (state == CONNECTED) {
- spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+ if (state == CONNECTED) {
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
- if (clientMsgWrk.getState() == State.NEW)
- clientMsgWrk.start();
+ if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+ clientMsgWrk.start();
- msgWorker.addMessage(msg);
+ processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
- continue;
- }
- else {
- spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
+ continue;
+ }
+ else {
+ spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
- break;
- }
+ break;
}
}
else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
@@ -6266,6 +6183,100 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Processes client reconnect message.
+ *
+ * @param msg Client reconnect message.
+ */
+ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+ UUID nodeId = msg.creatorNodeId();
+
+ UUID locNodeId = getLocalNodeId();
+
+ boolean isLocNodeRouter = msg.routerNodeId().equals(locNodeId);
+
+ TcpDiscoveryNode node = ring.node(nodeId);
+
+ assert node == null || node.isClient();
+
+ if (node != null) {
+ node.clientRouterNodeId(msg.routerNodeId());
+ node.clientAliveTime(spi.clientFailureDetectionTimeout());
+ }
+
+ if (!msg.verified()) {
+ if (isLocNodeRouter || isLocalNodeCoordinator()) {
+ if (node != null) {
+ Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node);
+
+ if (pending != null) {
+ msg.verify(locNodeId);
+ msg.pendingMessages(pending);
+ msg.success(true);
+
+ if (log.isDebugEnabled())
+ log.debug("Accept client reconnect, restored pending messages " +
+ "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else if (!isLocalNodeCoordinator()) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to restore pending messages for reconnecting client. " +
+ "Forwarding reconnection message to coordinator " +
+ "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else {
+ msg.verify(locNodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Failing reconnecting client node because failed to restore pending " +
+ "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']');
+
+ TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId,
+ node.id(), node.internalOrder());
+
+ msgWorker.addMessage(nodeFailedMsg);
+ }
+ }
+ else {
+ msg.verify(locNodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']');
+ }
+
+ if (msg.verified() && isLocNodeRouter) {
+ ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+ if (wrk != null)
+ wrk.addMessage(msg);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+ locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else
+ msgWorker.addMessage(msg);
+ }
+ else
+ msgWorker.addMessage(msg);
+ }
+ else {
+ if (isLocalNodeCoordinator())
+ msgWorker.addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
+
+ if (isLocNodeRouter) {
+ ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+ if (wrk != null)
+ wrk.addMessage(msg);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" +
+ locNodeId + ", clientNodeId=" + nodeId + ']');
+ }
+ else if (ring.hasRemoteNodes() && !isLocalNodeCoordinator())
+ msgWorker.addMessage(msg);
+ }
+ }
+
+ /**
* Processes client metrics update message.
*
* @param msg Client metrics update message.
http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index b31e2e4..f3cf48d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -299,9 +299,9 @@ abstract class TcpDiscoveryImpl {
/**
* <strong>FOR TEST ONLY!!!</strong>
*
- * @return Worker thread.
+ * @return Worker threads.
*/
- protected abstract IgniteSpiThread workerThread();
+ protected abstract Collection<IgniteSpiThread> threads();
/**
* @throws IgniteSpiException If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 689ac72..f1c826a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -56,15 +56,9 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
/** */
private final static long FAILURE_THRESHOLD = 10_000;
- /** */
- private final static long CLIENT_FAILURE_THRESHOLD = 30_000;
-
/** Failure detection timeout for nodes configuration. */
private static long failureThreshold = FAILURE_THRESHOLD;
- /** Client failure detection timeout for nodes configuration. */
- private static long clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
-
/** */
private static boolean useTestSpi;
@@ -75,7 +69,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
/** {@inheritDoc} */
@Override protected long clientFailureDetectionTimeout() {
- return clientFailureThreshold;
+ return clientFailureDetectionTimeout;
}
/** {@inheritDoc} */
@@ -153,7 +147,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
*/
public void testFailureTimeoutServerClient() throws Exception {
failureThreshold = 3000;
- clientFailureThreshold = 2000;
+ clientFailureDetectionTimeout = 2000;
try {
startServerNodes(1);
@@ -190,13 +184,12 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
long detectTime = failureDetectTime[0] - failureTime;
assertTrue("Client node failure detected too fast: " + detectTime + "ms",
- detectTime > clientFailureThreshold - 200);
+ detectTime > clientFailureDetectionTimeout - 200);
assertTrue("Client node failure detected too slow: " + detectTime + "ms",
- detectTime < clientFailureThreshold + 5000);
+ detectTime < clientFailureDetectionTimeout + 5000);
}
finally {
failureThreshold = FAILURE_THRESHOLD;
- clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
}
}
@@ -207,7 +200,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
*/
public void testFailureTimeout3Server() throws Exception {
failureThreshold = 1000;
- clientFailureThreshold = 10000;
+ clientFailureDetectionTimeout = 10000;
useTestSpi = true;
try {
@@ -254,11 +247,10 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
assertTrue("Server node failure detected too fast: " + detectTime + "ms",
detectTime > failureThreshold - 100);
assertTrue("Server node failure detected too slow: " + detectTime + "ms",
- detectTime < clientFailureThreshold);
+ detectTime < clientFailureDetectionTimeout);
}
finally {
failureThreshold = FAILURE_THRESHOLD;
- clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
useTestSpi = false;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 329783e..ee88b0f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -62,8 +62,8 @@ import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
@@ -73,6 +73,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
@@ -87,7 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
*/
public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
protected static final AtomicInteger srvIdx = new AtomicInteger();
@@ -123,6 +124,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
private static CountDownLatch clientFailedLatch;
/** */
+ private static CountDownLatch clientReconnectedLatch;
+
+ /** */
private static CountDownLatch msgLatch;
/** */
@@ -138,10 +142,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
protected long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
/** */
+ protected Integer reconnectCnt;
+
+ /** */
private boolean longSockTimeouts;
/** */
- private long clientFailureDetectionTimeout = 1000;
+ protected long clientFailureDetectionTimeout = 1000;
/** */
private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
@@ -207,6 +214,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
disco.setJoinTimeout(joinTimeout);
disco.setNetworkTimeout(netTimeout);
+ if (reconnectCnt != null)
+ disco.setReconnectCount(reconnectCnt);
+
disco.setClientReconnectDisabled(reconnectDisabled);
if (disco instanceof TestTcpDiscoverySpi)
@@ -253,6 +263,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientIpFinder = null;
joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT;
netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
+ clientFailureDetectionTimeout = 1000;
longSockTimeouts = false;
assert G.allGrids().isEmpty();
@@ -558,6 +569,221 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * Client should reconnect to available server without EVT_CLIENT_NODE_RECONNECTED event.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnRouterSuspend() throws Exception {
+ reconnectAfterSuspend(false);
+ }
+
+ /**
+ * Client should receive all topology updates after reconnect.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnRouterSuspendTopologyChange() throws Exception {
+ reconnectAfterSuspend(true);
+ }
+
+ /**
+ * @param changeTop If {@code true} topology is changed after client disconnects
+ * @throws Exception if failed.
+ */
+ private void reconnectAfterSuspend(boolean changeTop) throws Exception {
+ reconnectCnt = 2;
+
+ startServerNodes(2);
+
+ Ignite srv0 = grid("server-0");
+ TcpDiscoveryNode srv0Node = (TcpDiscoveryNode)srv0.cluster().localNode();
+
+ TcpDiscoveryNode srv1Node = (TcpDiscoveryNode)grid("server-1").cluster().localNode();
+
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+ clientIpFinder.setAddresses(
+ Collections.singleton("localhost:" + srv0Node.discoveryPort()));
+
+ startClientNodes(1);
+
+ Ignite client = grid("client-0");
+ TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+ TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+
+ UUID clientNodeId = clientNode.id();
+
+ checkNodes(2, 1);
+
+ clientIpFinder.setAddresses(Collections.singleton("localhost:" + srv1Node.discoveryPort()));
+
+ srvFailedLatch = new CountDownLatch(1);
+
+ attachListeners(2, 1);
+
+ log.info("Pausing router");
+
+ TestTcpDiscoverySpi srvSpi = (TestTcpDiscoverySpi)srv0.configuration().getDiscoverySpi();
+
+ int joinedNodesNum = 3;
+ final CountDownLatch srvJoinedLatch = new CountDownLatch(joinedNodesNum);
+
+ if (changeTop) {
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event e) {
+ srvJoinedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+ }
+
+ srvSpi.pauseAll(true);
+
+ if (changeTop)
+ startServerNodes(joinedNodesNum);
+
+ try {
+ await(srvFailedLatch, 60_000);
+
+ if (changeTop)
+ await(srvJoinedLatch, 5000);
+
+ assertEquals("connected", clientSpi.getSpiState());
+ assertEquals(clientNodeId, clientNode.id());
+ assertEquals(srv1Node.id(), clientNode.clientRouterNodeId());
+ }
+ finally {
+ srvSpi.resumeAll();
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testClientReconnectHistoryMissingOnRouter() throws Exception {
+ clientFailureDetectionTimeout = 60000;
+ netTimeout = 60000;
+
+ startServerNodes(2);
+
+ Ignite srv0 = grid("server-0");
+ TcpDiscoveryNode srv0Node = (TcpDiscoveryNode)srv0.cluster().localNode();
+
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+ clientIpFinder.setAddresses(
+ Collections.singleton("localhost:" + srv0Node.discoveryPort()));
+
+ startClientNodes(1);
+
+ attachListeners(0, 1);
+
+ Ignite client = grid("client-0");
+ TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+ TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+ UUID clientNodeId = clientNode.id();
+
+ checkNodes(2, 1);
+
+ clientSpi.pauseAll(true);
+
+ stopGrid(srv0.name());
+
+ startServerNodes(1);
+
+ Ignite srv2 = grid("server-2");
+ TcpDiscoveryNode srv2Node = (TcpDiscoveryNode)srv2.cluster().localNode();
+ clientIpFinder.setAddresses(
+ Collections.singleton("localhost:" + srv2Node.discoveryPort()));
+
+ clientSpi.resumeAll();
+
+ awaitPartitionMapExchange();
+
+ assertEquals("connected", clientSpi.getSpiState());
+ assertEquals(clientNodeId, clientNode.id());
+ assertEquals(srv2Node.id(), clientNode.clientRouterNodeId());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectAfterPause() throws Exception {
+ startServerNodes(2);
+ startClientNodes(1);
+
+ Ignite client = grid("client-0");
+ TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+
+ clientReconnectedLatch = new CountDownLatch(1);
+
+ attachListeners(0, 1);
+
+ clientSpi.pauseAll(false);
+
+ try {
+ clientSpi.brakeConnection();
+
+ Thread.sleep(clientFailureDetectionTimeout() * 2);
+ }
+ finally {
+ clientSpi.resumeAll();
+ }
+
+ await(clientReconnectedLatch);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testReconnectAfterMassiveTopologyChange() throws Exception {
+ clientIpFinder = IP_FINDER;
+
+ clientFailureDetectionTimeout = 60000;
+ netTimeout = 60000;
+
+ int initSrvsNum = 5;
+ int killNum = 3;
+ int iterations = 10;
+
+ startServerNodes(initSrvsNum);
+ startClientNodes(1);
+
+ Ignite client = grid("client-0");
+ TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+ TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+ final UUID clientNodeId = clientNode.id();
+
+ final CountDownLatch srvJoinedLatch = new CountDownLatch(iterations * killNum);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event e) {
+ srvJoinedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+
+ int minAliveSrvId = 0;
+
+ for (int i = 0; i < iterations; i++) {
+ for (int j = 0; j < killNum; j++) {
+ stopGrid(minAliveSrvId);
+
+ minAliveSrvId++;
+ }
+
+ startServerNodes(killNum);
+
+ awaitPartitionMapExchange();
+ }
+
+ await(srvJoinedLatch);
+ assertEquals("connected", clientSpi.getSpiState());
+ assertEquals(clientNodeId, clientNode.id());
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testClientReconnectOnNetworkProblem() throws Exception {
@@ -1410,17 +1636,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
srvSpi.failNode(client.cluster().localNode().id(), null);
- if (changeTop) {
- Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+ assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+ assertTrue(failLatch.await(5000, MILLISECONDS));
- srvNodeIds.add(g.cluster().localNode().id());
+ if (changeTop) {
+ startServerNodes(1);
clientSpi.resumeAll();
}
- assertTrue(disconnectLatch.await(5000, MILLISECONDS));
assertTrue(reconnectLatch.await(5000, MILLISECONDS));
- assertTrue(failLatch.await(5000, MILLISECONDS));
assertTrue(joinLatch.await(5000, MILLISECONDS));
long topVer = changeTop ? 5L : 4L;
@@ -2026,6 +2251,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}, EVT_NODE_FAILED);
}
}
+
+ if (clientReconnectedLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Reconnected event fired on client: " + evt);
+
+ clientReconnectedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_RECONNECTED);
+ }
+ }
}
/**
@@ -2095,7 +2334,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @throws InterruptedException If interrupted.
*/
protected void await(CountDownLatch latch) throws InterruptedException {
- assertTrue("Latch count: " + latch.getCount(), latch.await(awaitTime(), MILLISECONDS));
+ await(latch, awaitTime());
+ }
+
+ /**
+ * @param latch Latch.
+ * @param timeout Timeout.
+ * @throws InterruptedException If interrupted.
+ */
+ protected void await(CountDownLatch latch, long timeout) throws InterruptedException {
+ assertTrue("Latch count: " + latch.getCount(), latch.await(timeout, MILLISECONDS));
}
/**
@@ -2324,8 +2572,10 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
public void pauseAll(boolean suspend) {
pauseResumeOperation(true, openSockLock, writeLock);
- if (suspend)
- impl.workerThread().suspend();
+ if (suspend) {
+ for (Thread t : impl.threads())
+ t.suspend();
+ }
}
/**
@@ -2334,7 +2584,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
public void resumeAll() {
pauseResumeOperation(false, openSockLock, writeLock);
- impl.workerThread().resume();
+ for (IgniteSpiThread t : impl.threads())
+ t.resume();
}
/** {@inheritDoc} */