You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/09/07 22:46:48 UTC
[21/50] [abbrv] ignite git commit: # ignite-901 client reconnect
support
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4ca2995..2bce637 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -228,6 +228,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
public static final byte HANDSHAKE_MSG_TYPE = -3;
+ /** */
+ private ConnectGateway connectGate;
+
/** Server listener. */
private final GridNioServerListener<Message> srvLsnr =
new GridNioServerListenerAdapter<Message>() {
@@ -248,7 +251,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Sending local node ID to newly accepted session: " + ses);
- ses.send(nodeIdMsg);
+ ses.send(nodeIdMessage());
}
}
@@ -289,136 +292,163 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
- @Override public void onMessage(GridNioSession ses, Message msg) {
- UUID sndId = ses.meta(NODE_ID_META);
+ /**
+ * @param ses Session.
+ * @param msg Message.
+ */
+ private void onFirstMessage(GridNioSession ses, Message msg) {
+ UUID sndId;
- if (sndId == null) {
- assert ses.accepted();
+ if (msg instanceof NodeIdMessage)
+ sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+ else {
+ assert msg instanceof HandshakeMessage : msg;
- if (msg instanceof NodeIdMessage)
- sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
- else {
- assert msg instanceof HandshakeMessage : msg;
+ sndId = ((HandshakeMessage)msg).nodeId();
+ }
- sndId = ((HandshakeMessage)msg).nodeId();
- }
+ if (log.isDebugEnabled())
+ log.debug("Remote node ID received: " + sndId);
- if (log.isDebugEnabled())
- log.debug("Remote node ID received: " + sndId);
+ final UUID old = ses.addMeta(NODE_ID_META, sndId);
- final UUID old = ses.addMeta(NODE_ID_META, sndId);
+ assert old == null;
- assert old == null;
+ final ClusterNode rmtNode = getSpiContext().node(sndId);
- final ClusterNode rmtNode = getSpiContext().node(sndId);
+ if (rmtNode == null) {
+ if (log.isDebugEnabled())
+ log.debug("Close incoming connection, unknown node: " + sndId);
- if (rmtNode == null) {
- ses.close();
+ ses.close();
- return;
- }
+ return;
+ }
- ClusterNode locNode = getSpiContext().localNode();
+ ClusterNode locNode = getSpiContext().localNode();
- if (ses.remoteAddress() == null)
- return;
+ if (ses.remoteAddress() == null)
+ return;
- GridCommunicationClient oldClient = clients.get(sndId);
+ GridCommunicationClient oldClient = clients.get(sndId);
- boolean hasShmemClient = false;
+ boolean hasShmemClient = false;
- if (oldClient != null) {
- if (oldClient instanceof GridTcpNioCommunicationClient) {
- if (log.isDebugEnabled())
- log.debug("Received incoming connection when already connected " +
+ if (oldClient != null) {
+ if (oldClient instanceof GridTcpNioCommunicationClient) {
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection when already connected " +
"to this node, rejecting [locNode=" + locNode.id() +
", rmtNode=" + sndId + ']');
- ses.send(new RecoveryLastReceivedMessage(-1));
+ ses.send(new RecoveryLastReceivedMessage(-1));
- return;
- }
- else {
- assert oldClient instanceof GridShmemCommunicationClient;
+ return;
+ }
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
- hasShmemClient = true;
- }
+ hasShmemClient = true;
}
+ }
- GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
+ GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
- GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
+ GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
- assert msg instanceof HandshakeMessage : msg;
+ assert msg instanceof HandshakeMessage : msg;
- HandshakeMessage msg0 = (HandshakeMessage)msg;
+ HandshakeMessage msg0 = (HandshakeMessage)msg;
- final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
+ final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
- if (oldFut == null) {
- oldClient = clients.get(sndId);
+ if (oldFut == null) {
+ oldClient = clients.get(sndId);
- if (oldClient != null) {
- if (oldClient instanceof GridTcpNioCommunicationClient) {
- if (log.isDebugEnabled())
- log.debug("Received incoming connection when already connected " +
+ if (oldClient != null) {
+ if (oldClient instanceof GridTcpNioCommunicationClient) {
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection when already connected " +
"to this node, rejecting [locNode=" + locNode.id() +
", rmtNode=" + sndId + ']');
- ses.send(new RecoveryLastReceivedMessage(-1));
+ ses.send(new RecoveryLastReceivedMessage(-1));
- return;
- }
- else {
- assert oldClient instanceof GridShmemCommunicationClient;
+ return;
+ }
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
- hasShmemClient = true;
- }
+ hasShmemClient = true;
}
+ }
- boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+ boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
- if (log.isDebugEnabled())
- log.debug("Received incoming connection from remote node " +
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection from remote node " +
"[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
- if (reserved) {
- try {
- GridTcpNioCommunicationClient client =
+ if (reserved) {
+ try {
+ GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
- fut.onDone(client);
- }
- finally {
- clientFuts.remove(rmtNode.id(), fut);
- }
+ fut.onDone(client);
+ }
+ finally {
+ clientFuts.remove(rmtNode.id(), fut);
}
}
- else {
- if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
- if (log.isDebugEnabled()) {
- log.debug("Received incoming connection from remote node while " +
+ }
+ else {
+ if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received incoming connection from remote node while " +
"connecting to this node, rejecting [locNode=" + locNode.id() +
", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
", rmtNodeOrder=" + rmtNode.order() + ']');
- }
-
- ses.send(new RecoveryLastReceivedMessage(-1));
}
- else {
- boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+
+ ses.send(new RecoveryLastReceivedMessage(-1));
+ }
+ else {
+ boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
- if (reserved) {
- GridTcpNioCommunicationClient client =
+ if (reserved) {
+ GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
- fut.onDone(client);
- }
+ fut.onDone(client);
}
}
}
+ }
+
+ @Override public void onMessage(GridNioSession ses, Message msg) {
+ UUID sndId = ses.meta(NODE_ID_META);
+
+ if (sndId == null) {
+ assert ses.accepted() : ses;
+
+ if (!connectGate.tryEnter()) {
+ if (log.isDebugEnabled())
+ log.debug("Close incoming connection, failed to enter gateway.");
+
+ ses.close();
+
+ return;
+ }
+
+ try {
+ onFirstMessage(ses, msg);
+ }
+ finally {
+ connectGate.leave();
+ }
+ }
else {
rcvdMsgsCnt.increment();
@@ -700,9 +730,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Address resolver. */
private AddressResolver addrRslvr;
- /** Local node ID message. */
- private NodeIdMessage nodeIdMsg;
-
/** Received messages count. */
private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
@@ -739,8 +766,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
- assert evt instanceof DiscoveryEvent;
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+ assert evt instanceof DiscoveryEvent : evt;
+ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
}
@@ -1237,8 +1264,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
- nodeIdMsg = new NodeIdMessage(getLocalNodeId());
-
assertParameter(locPort > 1023, "locPort > 1023");
assertParameter(locPort <= 0xffff, "locPort < 0xffff");
assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1346,6 +1371,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
+ connectGate = new ConnectGateway();
+
if (shmemSrv != null) {
shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
@@ -1608,6 +1635,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// Safety.
ctxInitLatch.countDown();
+ if (connectGate != null)
+ connectGate.stopped();
+
// Force closing.
for (GridCommunicationClient client : clients.values())
client.forceClose();
@@ -1617,6 +1647,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
getSpiContext().removeLocalEventListener(discoLsnr);
}
+ /** {@inheritDoc} */
+ @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
+ connectGate.disconnected(reconnectFut);
+
+ for (GridCommunicationClient client : clients.values())
+ client.forceClose();
+
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+ "Failed to connect client node disconnected.");
+
+ for (GridFutureAdapter<GridCommunicationClient> clientFut : clientFuts.values())
+ clientFut.onDone(err);
+
+ recoveryDescs.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClientReconnected(boolean clusterRestarted) {
+ connectGate.reconnected();
+ }
+
/**
* @param nodeId Left node ID.
*/
@@ -1666,10 +1717,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isTraceEnabled())
log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
- UUID locNodeId = getLocalNodeId();
-
- if (node.id().equals(locNodeId))
- notifyListener(locNodeId, msg, NOOP);
+ if (node.id().equals(getLocalNode().id()))
+ notifyListener(node.id(), msg, NOOP);
else {
GridCommunicationClient client = null;
@@ -1834,7 +1883,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
- return createTcpClient(node);
+ connectGate.enter();
+
+ try {
+ return createTcpClient(node);
+ }
+ finally {
+ connectGate.leave();
+ }
}
/**
@@ -2208,7 +2264,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
if (recovery != null) {
- HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(),
+ HandshakeMessage msg = new HandshakeMessage(getLocalNode().id(),
recovery.incrementConnectCount(),
recovery.receivedCount());
@@ -2228,7 +2284,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
ch.write(buf);
}
else
- ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
+ ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
if (recovery != null) {
if (log.isDebugEnabled())
@@ -2355,6 +2411,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
getExceptionRegistry().onException(msg, e);
}
+ /**
+ * @return Node ID message.
+ */
+ private NodeIdMessage nodeIdMessage() {
+ return new NodeIdMessage(getLocalNode().id());
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpCommunicationSpi.class, this);
@@ -2692,10 +2755,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
ClusterNode node = recoveryDesc.node();
- if (clients.containsKey(node.id()) ||
- !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
- !getSpiContext().pingNode(node.id()))
+ try {
+ if (clients.containsKey(node.id()) ||
+ !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
+ !getSpiContext().pingNode(node.id()))
+ return;
+ }
+ catch (IgniteClientDisconnectedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to ping node, client disconnected.");
+
return;
+ }
try {
if (log.isDebugEnabled())
@@ -2860,15 +2931,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
+ UUID id = getLocalNode().id();
+
+ NodeIdMessage msg = new NodeIdMessage(id);
+
out.write(U.IGNITE_HEADER);
out.write(NODE_ID_MSG_TYPE);
- out.write(nodeIdMsg.nodeIdBytes);
+ out.write(msg.nodeIdBytes);
out.flush();
if (log.isDebugEnabled())
- log.debug("Sent local node ID [locNodeId=" + getLocalNodeId() + ", rmtNodeId="
- + rmtNodeId + ']');
+ log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + rmtNodeId + ']');
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to perform handshake.", e);
@@ -3082,6 +3156,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param nodeId Node ID.
*/
private NodeIdMessage(UUID nodeId) {
+ assert nodeId != null;
+
nodeIdBytes = U.uuidToBytes(nodeId);
nodeIdBytesWithType = new byte[nodeIdBytes.length + 1];
@@ -3131,4 +3207,86 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return S.toString(NodeIdMessage.class, this);
}
}
+
+ /**
+ *
+ */
+ private class ConnectGateway {
+ /** */
+ private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
+
+ /** */
+ private IgniteException err;
+
+ /**
+ *
+ */
+ void enter() {
+ lock.readLock();
+
+ if (err != null) {
+ lock.readUnlock();
+
+ throw err;
+ }
+ }
+
+ /**
+ * @return {@code True} if entered gateway.
+ */
+ boolean tryEnter() {
+ lock.readLock();
+
+ boolean res = err == null;
+
+ if (!res)
+ lock.readUnlock();
+
+ return res;
+ }
+
+ /**
+ *
+ */
+ void leave() {
+ lock.readUnlock();
+ }
+
+ /**
+ * @param reconnectFut Reconnect future.
+ */
+ void disconnected(IgniteFuture<?> reconnectFut) {
+ lock.writeLock();
+
+ err = new IgniteClientDisconnectedException(reconnectFut, "Failed to connect, client node disconnected.");
+
+ lock.writeUnlock();
+ }
+
+ /**
+ *
+ */
+ void reconnected() {
+ lock.writeLock();
+
+ try {
+ if (err instanceof IgniteClientDisconnectedException)
+ err = null;
+ }
+ finally {
+ lock.writeUnlock();
+ }
+ }
+
+ /**
+ *
+ */
+ void stopped() {
+ lock.readLock();
+
+ err = new IgniteException("Failed to connect, node stopped.");
+
+ lock.readUnlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
index 46d6716..038ea59 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
@@ -39,7 +39,8 @@ public interface DiscoverySpiDataExchange {
/**
* Notifies discovery manager about data received from remote node.
*
- * @param joiningNodeId Remote node ID.
+ * @param joiningNodeId ID of new node that joins topology.
+ * @param nodeId ID of the node provided data.
* @param data Collection of discovery data objects from different components.
*/
public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data);
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 3f05f59..572ba2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -41,6 +42,7 @@ import java.util.concurrent.atomic.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
+import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.*;
/**
*
@@ -71,7 +73,7 @@ class ClientImpl extends TcpDiscoveryImpl {
private SocketReader sockReader;
/** */
- private boolean segmented;
+ private volatile State state;
/** Last message ID. */
private volatile IgniteUuid lastMsgId;
@@ -94,6 +96,10 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
protected MessageWorker msgWorker;
+ /** */
+ @GridToStringExclude
+ private int joinCnt;
+
/**
* @param adapter Adapter.
*/
@@ -157,6 +163,9 @@ class ClientImpl extends TcpDiscoveryImpl {
locNode = spi.locNode;
+ // Marshal credentials for backward compatibility and security.
+ marshalCredentials(locNode);
+
sockWriter = new SocketWriter();
sockWriter.start();
@@ -258,23 +267,36 @@ class ClientImpl extends TcpDiscoveryImpl {
if (oldFut != null)
fut = oldFut;
else {
- if (spi.getSpiContext().isStopping()) {
+ State state = this.state;
+
+ if (spi.getSpiContext().isStopping() || state == STOPPED || state == SEGMENTED) {
if (pingFuts.remove(nodeId, fut))
fut.onDone(false);
return false;
}
+ else if (state == DISCONNECTED) {
+ if (pingFuts.remove(nodeId, fut))
+ fut.onDone(new IgniteClientDisconnectedCheckedException(null,
+ "Failed to ping node, client node disconnected."));
+ }
+ else {
+ final GridFutureAdapter<Boolean> finalFut = fut;
- final GridFutureAdapter<Boolean> finalFut = fut;
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- if (pingFuts.remove(nodeId, finalFut))
- finalFut.onDone(false);
- }
- }, spi.netTimeout);
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (pingFuts.remove(nodeId, finalFut)) {
+ if (ClientImpl.this.state == DISCONNECTED)
+ finalFut.onDone(new IgniteClientDisconnectedCheckedException(null,
+ "Failed to ping node, client node disconnected."));
+ else
+ finalFut.onDone(false);
+ }
+ }
+ }, spi.netTimeout);
- sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+ sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+ }
}
}
@@ -285,7 +307,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return false;
}
catch (IgniteCheckedException e) {
- throw new IgniteSpiException(e); // Should newer occur.
+ throw new IgniteSpiException(e);
}
}
@@ -325,8 +347,13 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
- if (segmented)
- throw new IgniteException("Failed to send custom message: client is disconnected");
+ State state = this.state;
+
+ if (state == SEGMENTED)
+ throw new IgniteException("Failed to send custom message: client is segmented.");
+
+ if (state == DISCONNECTED)
+ throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected.");
try {
sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
@@ -361,14 +388,11 @@ class ClientImpl extends TcpDiscoveryImpl {
* @see TcpDiscoverySpi#joinTimeout
*/
@SuppressWarnings("BusyWait")
- @Nullable private Socket joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException {
+ @Nullable private T2<Socket, Boolean> joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException {
Collection<InetSocketAddress> addrs = null;
long startTime = U.currentTimeMillis();
- // Marshal credentials for backward compatibility and security.
- marshalCredentials(locNode);
-
while (true) {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
@@ -400,7 +424,7 @@ class ClientImpl extends TcpDiscoveryImpl {
InetSocketAddress addr = it.next();
- T2<Socket, Integer> sockAndRes = sendJoinRequest(recon, addr);
+ T3<Socket, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
if (sockAndRes == null) {
it.remove();
@@ -414,7 +438,7 @@ class ClientImpl extends TcpDiscoveryImpl {
switch (sockAndRes.get2()) {
case RES_OK:
- return sock;
+ return new T2<>(sock, sockAndRes.get3());
case RES_CONTINUE_JOIN:
case RES_WAIT:
@@ -445,9 +469,9 @@ class ClientImpl extends TcpDiscoveryImpl {
/**
* @param recon {@code True} if reconnects.
* @param addr Address.
- * @return Socket and connect response.
+ * @return Socket, connect response and client acknowledge support flag.
*/
- @Nullable private T2<Socket, Integer> sendJoinRequest(boolean recon, InetSocketAddress addr) {
+ @Nullable private T3<Socket, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) {
assert addr != null;
if (log.isDebugEnabled())
@@ -493,9 +517,18 @@ class ClientImpl extends TcpDiscoveryImpl {
tstamp = U.currentTimeMillis();
- TcpDiscoveryAbstractMessage msg = recon ?
- new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) :
- new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
+ TcpDiscoveryAbstractMessage msg;
+
+ if (!recon) {
+ TcpDiscoveryNode node = locNode;
+
+ if (locNode.order() > 0)
+ node = locNode.clientReconnectNode();
+
+ msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId()));
+ }
+ else
+ msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);
msg.client(true);
@@ -507,7 +540,7 @@ class ClientImpl extends TcpDiscoveryImpl {
log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
", rmtNodeId=" + rmtNodeId + ']');
- return new T2<>(sock, spi.readReceipt(sock, ackTimeout0));
+ return new T3<>(sock, spi.readReceipt(sock, ackTimeout0), res.clientAck());
}
catch (IOException | IgniteCheckedException e) {
U.closeQuiet(sock);
@@ -786,10 +819,16 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.stats.onMessageReceived(msg);
- if (spi.ensured(msg) && joinLatch.getCount() == 0L)
- lastMsgId = msg.id();
+ boolean ack = msg instanceof TcpDiscoveryClientAckResponse;
+
+ if (!ack) {
+ if (spi.ensured(msg) && joinLatch.getCount() == 0L)
+ lastMsgId = msg.id();
- msgWorker.addMessage(msg);
+ msgWorker.addMessage(msg);
+ }
+ else
+ sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg);
}
}
catch (IOException e) {
@@ -823,8 +862,14 @@ class ClientImpl extends TcpDiscoveryImpl {
private Socket sock;
/** */
+ private boolean clientAck;
+
+ /** */
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
+ /** */
+ private TcpDiscoveryAbstractMessage unackedMsg;
+
/**
*
*/
@@ -845,11 +890,16 @@ class ClientImpl extends TcpDiscoveryImpl {
/**
* @param sock Socket.
+ * @param clientAck {@code True} is server supports client message acknowlede.
*/
- private void setSocket(Socket sock) {
+ private void setSocket(Socket sock, boolean clientAck) {
synchronized (mux) {
this.sock = sock;
+ this.clientAck = clientAck;
+
+ unackedMsg = null;
+
mux.notifyAll();
}
}
@@ -863,6 +913,21 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
+ /**
+ * @param res Acknowledge response.
+ */
+ void ackReceived(TcpDiscoveryClientAckResponse res) {
+ synchronized (mux) {
+ if (unackedMsg != null) {
+ assert unackedMsg.id().equals(res.messageId()) : unackedMsg;
+
+ unackedMsg = null;
+ }
+
+ mux.notifyAll();
+ }
+ }
+
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
TcpDiscoveryAbstractMessage msg = null;
@@ -892,10 +957,43 @@ class ClientImpl extends TcpDiscoveryImpl {
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
msgLsnr.apply(msg);
+ boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse);
+
try {
+ if (ack) {
+ synchronized (mux) {
+ assert unackedMsg == null : unackedMsg;
+
+ unackedMsg = msg;
+ }
+ }
+
spi.writeToSocket(sock, msg);
msg = null;
+
+ if (ack) {
+ long waitEnd = U.currentTimeMillis() + spi.ackTimeout;
+
+ TcpDiscoveryAbstractMessage unacked;
+
+ synchronized (mux) {
+ while (unackedMsg != null && U.currentTimeMillis() < waitEnd)
+ mux.wait(waitEnd);
+
+ unacked = unackedMsg;
+
+ unackedMsg = null;
+ }
+
+ if (unacked != null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get acknowledge for message, will try to reconnect " +
+ "[msg=" + unacked + ", timeout=" + spi.ackTimeout + ']');
+
+ throw new IOException("Failed to get acknowledge for message: " + unacked);
+ }
+ }
}
catch (IOException e) {
if (log.isDebugEnabled())
@@ -926,6 +1024,9 @@ class ClientImpl extends TcpDiscoveryImpl {
private volatile Socket sock;
/** */
+ private boolean clientAck;
+
+ /** */
private boolean join;
/**
@@ -948,8 +1049,6 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
- assert !segmented;
-
boolean success = false;
Exception err = null;
@@ -958,11 +1057,14 @@ class ClientImpl extends TcpDiscoveryImpl {
long startTime = U.currentTimeMillis();
+ if (log.isDebugEnabled())
+ log.debug("Started reconnect process [join=" + join + ", timeout=" + timeout + ']');
+
try {
while (true) {
- sock = joinTopology(true, timeout);
+ T2<Socket, Boolean> joinRes = joinTopology(true, timeout);
- if (sock == null) {
+ if (joinRes == null) {
if (join) {
joinError(new IgniteSpiException("Join process timed out, connection failed and " +
"failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
@@ -970,11 +1072,14 @@ class ClientImpl extends TcpDiscoveryImpl {
}
else
U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
- "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+ "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
return;
}
+ sock = joinRes.get1();
+ clientAck = joinRes.get2();
+
if (isInterrupted())
throw new InterruptedException();
@@ -999,6 +1104,10 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
if (res.creatorNodeId().equals(getLocalNodeId())) {
+ if (log.isDebugEnabled())
+ log.debug("Received reconnect response [success=" + res.success() +
+ ", msg=" + msg + ']');
+
if (res.success()) {
msgWorker.addMessage(res);
@@ -1008,9 +1117,11 @@ class ClientImpl extends TcpDiscoveryImpl {
}
success = true;
- }
- return;
+ return;
+ }
+ else
+ return;
}
}
else if (spi.ensured(msg)) {
@@ -1081,6 +1192,9 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private Reconnector reconnector;
+ /** */
+ private boolean nodeAdded;
+
/**
*
*/
@@ -1091,45 +1205,37 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@SuppressWarnings("InfiniteLoopStatement")
@Override protected void body() throws InterruptedException {
+ state = STARTING;
+
spi.stats.onJoinStarted();
try {
- final Socket sock = joinTopology(false, spi.joinTimeout);
-
- if (sock == null) {
- joinError(new IgniteSpiException("Join process timed out."));
-
- return;
- }
-
- currSock = sock;
-
- sockWriter.setSocket(sock);
-
- if (spi.joinTimeout > 0) {
- timer.schedule(new TimerTask() {
- @Override public void run() {
- if (joinLatch.getCount() > 0)
- queue.add(JOIN_TIMEOUT);
- }
- }, spi.joinTimeout);
- }
-
- sockReader.setSocket(sock, locNode.clientRouterNodeId());
+ tryJoin();
while (true) {
Object msg = queue.take();
if (msg == JOIN_TIMEOUT) {
- if (joinLatch.getCount() > 0) {
+ if (state == STARTING) {
joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
"join request (consider increasing 'joinTimeout' configuration property) " +
- "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+ "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
break;
}
+ else if (state == DISCONNECTED) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to reconnect, local node segmented " +
+ "[joinTimeout=" + spi.joinTimeout + ']');
+
+ state = SEGMENTED;
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
}
else if (msg == SPI_STOP) {
+ state = STOPPED;
+
assert spi.getSpiContext().isStopping();
if (currSock != null) {
@@ -1148,7 +1254,7 @@ class ClientImpl extends TcpDiscoveryImpl {
boolean join = joinLatch.getCount() > 0;
- if (spi.getSpiContext().isStopping() || segmented) {
+ if (spi.getSpiContext().isStopping() || state == SEGMENTED) {
leaveLatch.countDown();
if (join) {
@@ -1158,6 +1264,9 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
else {
+ if (log.isDebugEnabled())
+ log.debug("Connection closed, will try to restore connection.");
+
assert reconnector == null;
final Reconnector reconnector = new Reconnector(join);
@@ -1167,19 +1276,64 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
else if (msg == SPI_RECONNECT_FAILED) {
- if (!segmented) {
- segmented = true;
+ reconnector.cancel();
+ reconnector.join();
- reconnector.cancel();
- reconnector.join();
+ reconnector = null;
- notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ if (spi.isClientReconnectDisabled()) {
+ if (state != SEGMENTED && state != STOPPED) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to restore closed connection, reconnect disabled, " +
+ "local node segmented [networkTimeout=" + spi.netTimeout + ']');
+ }
+
+ state = SEGMENTED;
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
+ }
+ else {
+ if (state == STARTING || state == CONNECTED) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to restore closed connection, will try to reconnect " +
+ "[networkTimeout=" + spi.netTimeout + ", joinTimeout=" + spi.joinTimeout + ']');
+ }
+
+ state = DISCONNECTED;
+
+ nodeAdded = false;
+
+ IgniteClientDisconnectedCheckedException err =
+ new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
+ "client node disconnected.");
+
+ for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+ GridFutureAdapter<Boolean> fut = e.getValue();
+
+ if (pingFuts.remove(e.getKey(), fut))
+ fut.onDone(err);
+ }
+
+ notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+ }
+
+ UUID newId = UUID.randomUUID();
+
+ if (log.isInfoEnabled()) {
+ log.info("Client node disconnected from cluster, will try to reconnect with new id " +
+ "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']');
+ }
+
+ locNode.onClientDisconnected(newId);
+
+ tryJoin();
}
}
else {
TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
- if (joinLatch.getCount() > 0) {
+ if (joining()) {
IgniteSpiException err = null;
if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
@@ -1190,7 +1344,15 @@ class ClientImpl extends TcpDiscoveryImpl {
err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
if (err != null) {
- joinError(err);
+ if (state == DISCONNECTED) {
+ U.error(log, "Failed to reconnect, segment local node.", err);
+
+ state = SEGMENTED;
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
+ else
+ joinError(err);
break;
}
@@ -1215,6 +1377,48 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
+ * @throws InterruptedException If interrupted.
+ */
+ private void tryJoin() throws InterruptedException {
+ assert state == DISCONNECTED || state == STARTING : state;
+
+ boolean join = state == STARTING;
+
+ joinCnt++;
+
+ T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
+
+ if (joinRes == null) {
+ if (join)
+ joinError(new IgniteSpiException("Join process timed out."));
+ else {
+ state = SEGMENTED;
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
+
+ return;
+ }
+
+ currSock = joinRes.get1();
+
+ sockWriter.setSocket(joinRes.get1(), joinRes.get2());
+
+ if (spi.joinTimeout > 0) {
+ final int joinCnt0 = joinCnt;
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (joinCnt == joinCnt0 && joining())
+ queue.add(JOIN_TIMEOUT);
+ }
+ }, spi.joinTimeout);
+ }
+
+ sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId());
+ }
+
+ /**
* @param msg Message.
*/
protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
@@ -1246,6 +1450,22 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
+ * @return {@code True} if client in process of join.
+ */
+ private boolean joining() {
+ ClientImpl.State state = ClientImpl.this.state;
+
+ return state == STARTING || state == DISCONNECTED;
+ }
+
+ /**
+ * @return {@code True} if client disconnected.
+ */
+ private boolean disconnected() {
+ return state == DISCONNECTED;
+ }
+
+ /**
* @param msg Message.
*/
private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
@@ -1257,12 +1477,15 @@ class ClientImpl extends TcpDiscoveryImpl {
UUID newNodeId = node.id();
if (getLocalNodeId().equals(newNodeId)) {
- if (joinLatch.getCount() > 0) {
+ if (joining()) {
Collection<TcpDiscoveryNode> top = msg.topology();
if (top != null) {
spi.gridStartTime = msg.gridStartTime();
+ if (disconnected())
+ rmtNodes.clear();
+
for (TcpDiscoveryNode n : top) {
if (n.order() > 0)
n.visible(true);
@@ -1272,6 +1495,8 @@ class ClientImpl extends TcpDiscoveryImpl {
topHist.clear();
+ nodeAdded = true;
+
if (msg.topologyHistory() != null)
topHist.putAll(msg.topologyHistory());
}
@@ -1309,7 +1534,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
if (getLocalNodeId().equals(msg.nodeId())) {
- if (joinLatch.getCount() > 0) {
+ if (joining()) {
Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
if (dataMap != null) {
@@ -1324,13 +1549,22 @@ class ClientImpl extends TcpDiscoveryImpl {
locNode.order(topVer);
- notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer, msg));
+ Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg);
+
+ notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes);
+
+ boolean disconnected = disconnected();
+
+ state = CONNECTED;
+
+ if (disconnected)
+ notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes);
+ else
+ spi.stats.onJoinFinished();
joinErr.set(null);;
joinLatch.countDown();
-
- spi.stats.onJoinFinished();
}
else if (log.isDebugEnabled())
log.debug("Discarding node add finished message (this message has already been processed) " +
@@ -1438,7 +1672,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @return {@code True} if received node added message for local node.
*/
private boolean nodeAdded() {
- return !topHist.isEmpty();
+ return nodeAdded;
}
/**
@@ -1539,7 +1773,7 @@ class ClientImpl extends TcpDiscoveryImpl {
currSock = reconnector.sock;
- sockWriter.setSocket(currSock);
+ sockWriter.setSocket(currSock, reconnector.clientAck);
sockReader.setSocket(currSock, locNode.clientRouterNodeId());
reconnector = null;
@@ -1583,7 +1817,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
- if (msg.verified() && joinLatch.getCount() == 0) {
+ if (msg.verified() && state == CONNECTED) {
DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
@@ -1719,4 +1953,24 @@ class ClientImpl extends TcpDiscoveryImpl {
this.sock = sock;
}
}
+
+ /**
+ *
+ */
+ enum State {
+ /** */
+ STARTING,
+
+ /** */
+ CONNECTED,
+
+ /** */
+ DISCONNECTED,
+
+ /** */
+ SEGMENTED,
+
+ /** */
+ STOPPED
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d51293e..1a28e86 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1447,6 +1447,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Heartbeats sender has been started.");
+ UUID nodeId = getConfiguredNodeId();
+
while (!isInterrupted()) {
if (spiStateCopy() != CONNECTED) {
if (log.isDebugEnabled())
@@ -1455,7 +1457,7 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId());
+ TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId);
msg.verify(getLocalNodeId());
@@ -1593,39 +1595,47 @@ class ServerImpl extends TcpDiscoveryImpl {
// Addresses registered in IP finder.
Collection<InetSocketAddress> regAddrs = spi.registeredAddresses();
- // Remove all addresses that belong to alive nodes, leave dead-node addresses.
- Collection<InetSocketAddress> rmvAddrs = F.view(
- regAddrs,
- F.notContains(currAddrs),
- new P1<InetSocketAddress>() {
- private final Map<InetSocketAddress, Boolean> pingResMap = new HashMap<>();
+ P1<InetSocketAddress> p = new P1<InetSocketAddress>() {
+ private final Map<InetSocketAddress, Boolean> pingResMap = new HashMap<>();
- @Override public boolean apply(InetSocketAddress addr) {
- Boolean res = pingResMap.get(addr);
+ @Override public boolean apply(InetSocketAddress addr) {
+ Boolean res = pingResMap.get(addr);
- if (res == null) {
- try {
- res = pingNode(addr, null).get1() != null;
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to ping node [addr=" + addr +
- ", err=" + e.getMessage() + ']');
-
- res = false;
- }
- finally {
- pingResMap.put(addr, res);
- }
+ if (res == null) {
+ try {
+ res = pingNode(addr, null).get1() != null;
}
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to ping node [addr=" + addr +
+ ", err=" + e.getMessage() + ']');
- return !res;
+ res = false;
+ }
+ finally {
+ pingResMap.put(addr, res);
+ }
}
+
+ return !res;
}
- );
+ };
+
+ ArrayList<InetSocketAddress> rmvAddrs = null;
+
+ for (InetSocketAddress addr : regAddrs) {
+ boolean rmv = !F.contains(currAddrs, addr) && p.apply(addr);
+
+ if (rmv) {
+ if (rmvAddrs == null)
+ rmvAddrs = new ArrayList<>();
+
+ rmvAddrs.add(addr);
+ }
+ }
// Unregister dead-nodes addresses.
- if (!rmvAddrs.isEmpty()) {
+ if (rmvAddrs != null) {
spi.ipFinder.unregisterAddresses(rmvAddrs);
if (log.isDebugEnabled())
@@ -4077,7 +4087,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
- UUID locNodeId = getLocalNodeId();
+ UUID locNodeId = getConfiguredNodeId();
ClientMessageWorker clientMsgWrk = null;
@@ -4170,6 +4180,9 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryHandshakeResponse res =
new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
+ if (req.client())
+ res.clientAck(true);
+
spi.writeToSocket(sock, res);
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
@@ -4313,7 +4326,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (state == CONNECTED) {
spi.writeToSocket(msg, sock, RES_OK);
- if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+ if (clientMsgWrk.getState() == State.NEW)
clientMsgWrk.start();
msgWorker.addMessage(msg);
@@ -4457,7 +4470,14 @@ class ServerImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(msg);
// Send receipt back.
- if (clientMsgWrk == null)
+ if (clientMsgWrk != null) {
+ TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id());
+
+ ack.verify(locNodeId);
+
+ clientMsgWrk.addMessage(ack);
+ }
+ else
spi.writeToSocket(msg, sock, RES_OK);
}
catch (IgniteCheckedException e) {
@@ -4567,8 +4587,11 @@ class ServerImpl extends TcpDiscoveryImpl {
msg.responded(true);
- if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+ if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) {
+ clientMsgWrk.clientVersion(U.productVersion(msg.node()));
+
clientMsgWrk.start();
+ }
msgWorker.addMessage(msg);
@@ -4679,6 +4702,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** */
private final AtomicReference<GridFutureAdapter<Boolean>> pingFut = new AtomicReference<>();
+ /** */
+ private IgniteProductVersion clientVer;
+
/**
* @param sock Socket.
* @param clientNodeId Node ID.
@@ -4691,6 +4717,13 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * @param clientVer Client version.
+ */
+ void clientVersion(IgniteProductVersion clientVer) {
+ this.clientVer = clientVer;
+ }
+
+ /**
* @return Current client metrics.
*/
ClusterMetrics metrics() {
@@ -4709,17 +4742,40 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
assert msg.verified() : msg;
- if (log.isDebugEnabled())
- log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
- + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+ if (msg instanceof TcpDiscoveryClientAckResponse) {
+ if (clientVer == null) {
+ ClusterNode node = spi.getNode(clientNodeId);
- try {
- prepareNodeAddedMessage(msg, clientNodeId, null, null);
+ if (node != null)
+ clientVer = IgniteUtils.productVersion(node);
+ else if (log.isDebugEnabled())
+ log.debug("Skip sending message ack to client, fail to get client node " +
+ "[sock=" + sock + ", locNodeId=" + getLocalNodeId() +
+ ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+ }
+
+ if (clientVer != null &&
+ clientVer.compareTo(TcpDiscoveryClientAckResponse.CLIENT_ACK_SINCE_VERSION) >= 0) {
+ if (log.isDebugEnabled())
+ log.debug("Sending message ack to client [sock=" + sock + ", locNodeId="
+ + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
- writeToSocket(sock, msg);
+ writeToSocket(sock, msg);
+ }
}
- finally {
- clearNodeAddedMessage(msg);
+ else {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+ + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+
+ prepareNodeAddedMessage(msg, clientNodeId, null, null);
+
+ writeToSocket(sock, msg);
+ }
+ finally {
+ clearNodeAddedMessage(msg);
+ }
}
}
catch (IgniteCheckedException | IOException e) {
@@ -4829,7 +4885,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
if (log.isDebugEnabled())
- log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']');
+ log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']');
while (!isInterrupted()) {
TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index ace917f..c271b7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -112,7 +112,14 @@ abstract class TcpDiscoveryImpl {
* @return Local node ID.
*/
public UUID getLocalNodeId() {
- return spi.getLocalNodeId();
+ return spi.locNode.id();
+ }
+
+ /**
+ * @return Configured node ID (actual node ID can be different if client reconnects).
+ */
+ public UUID getConfiguredNodeId() {
+ return spi.cfgNodeId;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 7663fe6..431d198 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -260,6 +260,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Local node. */
protected TcpDiscoveryNode locNode;
+ /** */
+ protected UUID cfgNodeId;
+
/** Local host. */
protected InetAddress locHost;
@@ -327,6 +330,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** */
private boolean forceSrvMode;
+ /** */
+ private boolean clientReconnectDisabled;
+
/** {@inheritDoc} */
@Override public String getSpiState() {
return impl.getSpiState();
@@ -417,6 +423,29 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/**
+ * If {@code true} client does not try to reconnect after
+ * server detected client node failure.
+ *
+ * @return Client reconnect disabled flag.
+ */
+ public boolean isClientReconnectDisabled() {
+ return clientReconnectDisabled;
+ }
+
+ /**
+ * Sets client reconnect disabled flag.
+ * <p>
+ * If {@code true} client does not try to reconnect after
+ * server detected client node failure.
+ *
+ * @param clientReconnectDisabled Client reconnect disabled flag.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setClientReconnectDisabled(boolean clientReconnectDisabled) {
+ this.clientReconnectDisabled = clientReconnectDisabled;
+ }
+
+ /**
* Inject resources
*
* @param ignite Ignite.
@@ -844,7 +873,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
locNode = new TcpDiscoveryNode(
- getLocalNodeId(),
+ ignite.configuration().getNodeId(),
addrs.get1(),
addrs.get2(),
srvPort,
@@ -1615,6 +1644,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
mcastIpFinder.setLocalAddress(locAddr);
}
+ cfgNodeId = ignite.configuration().getNodeId();
+
impl.spiStart(gridName);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 22f56c3..032cf01 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -441,6 +441,25 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
this.clientRouterNodeId = clientRouterNodeId;
}
+ /**
+ * @param newId New node ID.
+ */
+ public void onClientDisconnected(UUID newId) {
+ id = newId;
+ }
+
+ /**
+ * @return Copy of local node for client reconnect request.
+ */
+ public TcpDiscoveryNode clientReconnectNode() {
+ TcpDiscoveryNode node = new TcpDiscoveryNode(id, addrs, hostNames, discPort, metricsProvider, ver);
+
+ node.attrs = attrs;
+ node.clientRouterNodeId = clientRouterNodeId;
+
+ return node;
+ }
+
/** {@inheritDoc} */
@Override public int compareTo(@Nullable TcpDiscoveryNode node) {
if (node == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 21dbf4f..6f52152 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -40,6 +40,9 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
/** */
protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2;
+ /** */
+ protected static final int CLIENT_ACK_FLAG_POS = 4;
+
/** Sender of the message (transient). */
private transient UUID sndNodeId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
new file mode 100644
index 0000000..ce3943a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class TcpDiscoveryClientAckResponse extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final IgniteProductVersion CLIENT_ACK_SINCE_VERSION = IgniteProductVersion.fromString("1.4.1");
+
+ /** */
+ private final IgniteUuid msgId;
+
+ /**
+ * @param creatorNodeId Creator node ID.
+ * @param msgId Message ID to ack.
+ */
+ public TcpDiscoveryClientAckResponse(UUID creatorNodeId, IgniteUuid msgId) {
+ super(creatorNodeId);
+
+ this.msgId = msgId;
+ }
+
+ /**
+ * @return Acknowledged message ID.
+ */
+ public IgniteUuid messageId() {
+ return msgId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean highPriority() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryClientAckResponse.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
index 5c2f798..ac4be50 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
@@ -61,6 +61,20 @@ public class TcpDiscoveryHandshakeResponse extends TcpDiscoveryAbstractMessage {
this.order = order;
}
+ /**
+ * @return {@code True} if server supports client message acknowledge.
+ */
+ public boolean clientAck() {
+ return getFlag(CLIENT_ACK_FLAG_POS);
+ }
+
+ /**
+ * @param clientAck {@code True} if server supports client message acknowledge.
+ */
+ public void clientAck(boolean clientAck) {
+ setFlag(CLIENT_ACK_FLAG_POS, clientAck);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 7a88426..000782a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -257,7 +257,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
registerMBean(gridName, this, FileSwapSpaceSpiMBean.class);
- String path = baseDir + File.separator + gridName + File.separator + getLocalNodeId();
+ String path = baseDir + File.separator + gridName + File.separator + ignite.configuration().getNodeId();
try {
dir = U.resolveWorkDirectory(path, true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
index abc9109..bf499c3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.*;
@@ -104,8 +105,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
* Test kernal gateway that always return uninitialized user stack trace.
*/
private static final GridKernalGateway TEST_GATEWAY = new GridKernalGateway() {
- @Override public void lightCheck() throws IllegalStateException {}
-
@Override public void readLock() throws IllegalStateException {}
@Override public void readLockAnyway() {}
@@ -122,10 +121,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
@Override public void writeUnlock() {}
- @Override public void addStopListener(Runnable lsnr) {}
-
- @Override public void removeStopListener(Runnable lsnr) {}
-
@Override public String userStackTrace() {
return null;
}
@@ -133,5 +128,13 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
@Override public boolean tryWriteLock(long timeout) {
return false;
}
+
+ @Override public GridFutureAdapter<?> onDisconnected() {
+ return null;
+ }
+
+ @Override public void onReconnected() {
+ // No-op.
+ }
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
new file mode 100644
index 0000000..fbaea11
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.net.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final long RECONNECT_TIMEOUT = 10_000;
+
+ /** */
+ protected boolean clientMode;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+ disco.setJoinTimeout(2 * 60_000);
+ disco.setSocketTimeout(1000);
+ disco.setNetworkTimeout(2000);
+
+ cfg.setDiscoverySpi(disco);
+
+ BlockTpcCommunicationSpi commSpi = new BlockTpcCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ if (clientMode)
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /**
+ * @param latch Latch.
+ * @throws Exception If failed.
+ */
+ protected void waitReconnectEvent(CountDownLatch latch) throws Exception {
+ if (!latch.await(RECONNECT_TIMEOUT, MILLISECONDS)) {
+ log.error("Failed to wait for reconnect event, will dump threads, latch count: " + latch.getCount());
+
+ U.dumpThreads(log);
+
+ fail("Failed to wait for disconnect/reconnect event.");
+ }
+ }
+
+ /**
+ * @return Number of server nodes started before tests.
+ */
+ protected abstract int serverCount();
+
+ /**
+ * @return Number of client nodes started before tests.
+ */
+ protected int clientCount() {
+ return 0;
+ }
+
+ /**
+ * @param ignite Node.
+ * @return Discovery SPI.
+ */
+ protected TestTcpDiscoverySpi spi(Ignite ignite) {
+ return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+ }
+
+ /**
+ * @param ignite Node.
+ * @return Communication SPI.
+ */
+ protected BlockTpcCommunicationSpi commSpi(Ignite ignite) {
+ return ((BlockTpcCommunicationSpi)ignite.configuration().getCommunicationSpi());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ int srvs = serverCount();
+
+ if (srvs > 0)
+ startGrids(srvs);
+
+ int clients = clientCount();
+
+ if (clients > 0) {
+ clientMode = true;
+
+ startGridsMultiThreaded(srvs, clients);
+
+ clientMode = false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @param client Client.
+ * @return Server node client connected to.
+ */
+ protected Ignite clientRouter(Ignite client) {
+ TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode();
+
+ assertTrue(node.isClient());
+ assertNotNull(node.clientRouterNodeId());
+
+ Ignite srv = G.ignite(node.clientRouterNodeId());
+
+ assertNotNull(srv);
+
+ return srv;
+ }
+
+ /**
+ * @param fut Future.
+ * @throws Exception If failed.
+ */
+ protected void assertNotDone(IgniteInternalFuture<?> fut) throws Exception {
+ assertNotNull(fut);
+
+ if (fut.isDone())
+ fail("Future completed with result: " + fut.get());
+ }
+
+ /**
+ * Reconnect client node.
+ *
+ * @param client Client.
+ * @param srv Server.
+ * @param disconnectedC Closure which will be run when client node disconnected.
+ * @throws Exception If failed.
+ */
+ protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC)
+ throws Exception {
+ final TestTcpDiscoverySpi clientSpi = spi(client);
+ final TestTcpDiscoverySpi srvSpi = spi(srv);
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ log.info("Block reconnect.");
+
+ clientSpi.writeLatch = new CountDownLatch(1);
+
+ IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ };
+
+ client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ srvSpi.failNode(client.cluster().localNode().id(), null);
+
+ waitReconnectEvent(disconnectLatch);
+
+ if (disconnectedC != null)
+ disconnectedC.run();
+
+ log.info("Allow reconnect.");
+
+ clientSpi.writeLatch.countDown();
+
+ waitReconnectEvent(reconnectLatch);
+
+ client.events().stopLocalListen(p);
+ }
+
+ /**
+ * @param e Client disconnected exception.
+ * @return Reconnect future.
+ */
+ protected IgniteFuture<?> check(CacheException e) {
+ log.info("Expected exception: " + e);
+
+ if (!(e.getCause() instanceof IgniteClientDisconnectedException))
+ log.error("Unexpected cause: " + e.getCause(), e);
+
+ assertTrue("Unexpected cause: " + e.getCause(), e.getCause() instanceof IgniteClientDisconnectedException);
+
+ IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();
+
+ assertNotNull(e0.reconnectFuture());
+
+ return e0.reconnectFuture();
+ }
+
+ /**
+ * @param e Client disconnected exception.
+ */
+ protected void checkAndWait(CacheException e) {
+ check(e).get();
+ }
+
+ /**
+ * @param e Client disconnected exception.
+ */
+ protected void checkAndWait(IgniteClientDisconnectedException e) {
+ log.info("Expected exception: " + e);
+
+ assertNotNull(e.reconnectFuture());
+
+ e.reconnectFuture().get();
+ }
+
+ /**
+ *
+ */
+ protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ volatile CountDownLatch writeLatch;
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+ throws IOException, IgniteCheckedException {
+ if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+ CountDownLatch writeLatch0 = writeLatch;
+
+ if (writeLatch0 != null) {
+ log.info("Block join request send: " + msg);
+
+ U.await(writeLatch0);
+ }
+ }
+
+ super.writeToSocket(sock, msg);
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class BlockTpcCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ volatile Class msgCls;
+
+ /** */
+ AtomicBoolean collectStart = new AtomicBoolean(false);
+
+ /** */
+ ConcurrentHashMap<String, ClusterNode> classes = new ConcurrentHashMap<>();
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ Class msgCls0 = msgCls;
+
+ if (collectStart.get() && msg instanceof GridIoMessage)
+ classes.put(((GridIoMessage)msg).message().getClass().getName(), node);
+
+ if (msgCls0 != null && msg instanceof GridIoMessage
+ && ((GridIoMessage)msg).message().getClass().equals(msgCls)) {
+ log.info("Block message: " + msg);
+
+ return;
+ }
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * @param clazz Class of messages which will be block.
+ */
+ public void blockMessage(Class clazz) {
+ msgCls = clazz;
+ }
+
+ /**
+ * Unlock all message.
+ */
+ public void unblockMessage() {
+ msgCls = null;
+ }
+
+ /**
+ * Start collect messages.
+ */
+ public void start() {
+ collectStart.set(true);
+ }
+
+ /**
+ * Print collected messages.
+ */
+ public void print() {
+ for (String s : classes.keySet())
+ log.error(s);
+ }
+ }
+}