You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 18:27:42 UTC
[19/31] incubator-ignite git commit: ignite-471-2: huge merge from
sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index fd17791..359de1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -157,12 +157,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Default idle connection timeout (value is <tt>30000</tt>ms). */
public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
- /** Default value for connection buffer flush frequency (value is <tt>100</tt> ms). */
- public static final long DFLT_CONN_BUF_FLUSH_FREQ = 100;
-
- /** Default value for connection buffer size (value is <tt>0</tt>). */
- public static final int DFLT_CONN_BUF_SIZE = 0;
-
/** Default socket send and receive buffer size. */
public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
@@ -267,7 +261,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug("Session was closed but there are unacknowledged messages, " +
"will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
- recoveryWorker.addReconnectRequest(recoveryData);
+ commWorker.addReconnectRequest(recoveryData);
}
}
else
@@ -603,13 +597,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Idle connection timeout. */
private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
- /** Connection buffer flush frequency. */
- private volatile long connBufFlushFreq = DFLT_CONN_BUF_FLUSH_FREQ;
-
- /** Connection buffer size. */
- @SuppressWarnings("RedundantFieldInitialization")
- private int connBufSize = DFLT_CONN_BUF_SIZE;
-
/** Connect timeout. */
private long connTimeout = DFLT_CONN_TIMEOUT;
@@ -647,17 +634,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Socket write timeout. */
private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
- /** Idle client worker. */
- private IdleClientWorker idleClientWorker;
-
- /** Flush client worker. */
- private ClientFlushWorker clientFlushWorker;
-
- /** Socket timeout worker. */
- private SocketTimeoutWorker sockTimeoutWorker;
-
- /** Recovery worker. */
- private RecoveryWorker recoveryWorker;
+ /** Recovery and idle clients handler. */
+ private CommunicationWorker commWorker;
/** Clients. */
private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
@@ -882,31 +860,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* Sets connection buffer size. If set to {@code 0} connection buffer is disabled.
- * <p>
- * If not provided, default value is {@link #DFLT_CONN_BUF_SIZE}.
*
* @param connBufSize Connection buffer size.
* @see #setConnectionBufferFlushFrequency(long)
*/
@IgniteSpiConfiguration(optional = true)
public void setConnectionBufferSize(int connBufSize) {
- this.connBufSize = connBufSize;
+ // No-op.
}
/** {@inheritDoc} */
@Override public int getConnectionBufferSize() {
- return connBufSize;
+ return 0;
}
/** {@inheritDoc} */
@IgniteSpiConfiguration(optional = true)
@Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) {
- this.connBufFlushFreq = connBufFlushFreq;
+ // No-op.
}
/** {@inheritDoc} */
@Override public long getConnectionBufferFlushFrequency() {
- return connBufFlushFreq;
+ return 0;
}
/**
@@ -1174,8 +1150,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(locPort <= 0xffff, "locPort < 0xffff");
assertParameter(locPortRange >= 0, "locPortRange >= 0");
assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0");
- assertParameter(connBufFlushFreq > 0, "connBufFlushFreq > 0");
- assertParameter(connBufSize >= 0, "connBufSize >= 0");
assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
@@ -1245,8 +1219,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("idleConnTimeout", idleConnTimeout));
log.debug(configInfo("directBuf", directBuf));
log.debug(configInfo("directSendBuf", directSndBuf));
- log.debug(configInfo("connBufSize", connBufSize));
- log.debug(configInfo("connBufFlushFreq", connBufFlushFreq));
log.debug(configInfo("selectorsCnt", selectorsCnt));
log.debug(configInfo("tcpNoDelay", tcpNoDelay));
log.debug(configInfo("sockSndBuf", sockSndBuf));
@@ -1261,11 +1233,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
}
- if (connBufSize > 8192)
- U.warn(log, "Specified communication IO buffer size is larger than recommended (ignore if done " +
- "intentionally) [specified=" + connBufSize + ", recommended=8192]",
- "Specified communication IO buffer size is larger than recommended (ignore if done intentionally).");
-
if (!tcpNoDelay)
U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " +
"since may produce significant delays with some scenarios.");
@@ -1274,23 +1241,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
nioSrvr.start();
- idleClientWorker = new IdleClientWorker();
+ commWorker = new CommunicationWorker();
- idleClientWorker.start();
-
- recoveryWorker = new RecoveryWorker();
-
- recoveryWorker.start();
-
- if (connBufSize > 0) {
- clientFlushWorker = new ClientFlushWorker();
-
- clientFlushWorker.start();
- }
-
- sockTimeoutWorker = new SocketTimeoutWorker();
-
- sockTimeoutWorker.start();
+ commWorker.start();
// Ack start.
if (log.isDebugEnabled())
@@ -1445,15 +1398,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (nioSrvr != null)
nioSrvr.stop();
- U.interrupt(idleClientWorker);
- U.interrupt(clientFlushWorker);
- U.interrupt(sockTimeoutWorker);
- U.interrupt(recoveryWorker);
+ U.interrupt(commWorker);
- U.join(idleClientWorker, log);
- U.join(clientFlushWorker, log);
- U.join(sockTimeoutWorker, log);
- U.join(recoveryWorker, log);
+ U.join(commWorker, log);
// Force closing on stop (safety).
for (GridCommunicationClient client : clients.values())
@@ -1461,7 +1408,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// Clear resources.
nioSrvr = null;
- idleClientWorker = null;
+ commWorker = null;
boundTcpPort = -1;
@@ -1899,7 +1846,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
) throws IgniteCheckedException {
HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
- sockTimeoutWorker.addTimeoutObject(obj);
+ addTimeoutObject(obj);
long rcvCnt = 0;
@@ -2005,7 +1952,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
boolean cancelled = obj.cancel();
if (cancelled)
- sockTimeoutWorker.removeTimeoutObject(obj);
+ removeTimeoutObject(obj);
// Ignoring whatever happened after timeout - reporting only timeout event.
if (!cancelled)
@@ -2041,15 +1988,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (nioSrvr != null)
nioSrvr.stop();
- U.interrupt(idleClientWorker);
- U.interrupt(clientFlushWorker);
- U.interrupt(sockTimeoutWorker);
- U.interrupt(recoveryWorker);
+ U.interrupt(commWorker);
- U.join(idleClientWorker, log);
- U.join(clientFlushWorker, log);
- U.join(sockTimeoutWorker, log);
- U.join(recoveryWorker, log);
+ U.join(commWorker, log);
for (GridCommunicationClient client : clients.values())
client.forceClose();
@@ -2156,80 +2097,95 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
*
*/
- private class IdleClientWorker extends IgniteSpiThread {
+ private class CommunicationWorker extends IgniteSpiThread {
+ /** */
+ private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
+
/**
*
*/
- IdleClientWorker() {
- super(gridName, "nio-idle-client-collector", log);
+ private CommunicationWorker() {
+ super(gridName, "tcp-comm-worker", log);
}
/** {@inheritDoc} */
- @SuppressWarnings({"BusyWait"})
@Override protected void body() throws InterruptedException {
+ if (log.isDebugEnabled())
+ log.debug("Tcp communication worker has been started.");
+
while (!isInterrupted()) {
- cleanupRecovery();
+ GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
- for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
- UUID nodeId = e.getKey();
+ if (recoveryDesc != null)
+ processRecovery(recoveryDesc);
+ else
+ processIdle();
+ }
+ }
- GridCommunicationClient client = e.getValue();
+ /**
+ *
+ */
+ private void processIdle() {
+ cleanupRecovery();
- ClusterNode node = getSpiContext().node(nodeId);
+ for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
+ UUID nodeId = e.getKey();
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Forcing close of non-existent node connection: " + nodeId);
+ GridCommunicationClient client = e.getValue();
- client.forceClose();
+ ClusterNode node = getSpiContext().node(nodeId);
- clients.remove(nodeId, client);
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Forcing close of non-existent node connection: " + nodeId);
- continue;
- }
+ client.forceClose();
- GridNioRecoveryDescriptor recovery = null;
+ clients.remove(nodeId, client);
- if (client instanceof GridTcpNioCommunicationClient) {
- recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+ continue;
+ }
- if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
- RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+ GridNioRecoveryDescriptor recovery = null;
- if (log.isDebugEnabled())
- log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
- ", rcvCnt=" + msg.received() + ']');
+ if (client instanceof GridTcpNioCommunicationClient) {
+ recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
- nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+ if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+ RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
- recovery.lastAcknowledged(msg.received());
+ if (log.isDebugEnabled())
+ log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
+ ", rcvCnt=" + msg.received() + ']');
- continue;
- }
- }
+ nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
- long idleTime = client.getIdleTime();
+ recovery.lastAcknowledged(msg.received());
- if (idleTime >= idleConnTimeout) {
- if (recovery != null &&
- recovery.nodeAlive(getSpiContext().node(nodeId)) &&
- !recovery.messagesFutures().isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Node connection is idle, but there are unacknowledged messages, " +
- "will wait: " + nodeId);
+ continue;
+ }
+ }
- continue;
- }
+ long idleTime = client.getIdleTime();
+ if (idleTime >= idleConnTimeout) {
+ if (recovery != null &&
+ recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+ !recovery.messagesFutures().isEmpty()) {
if (log.isDebugEnabled())
- log.debug("Closing idle node connection: " + nodeId);
+ log.debug("Node connection is idle, but there are unacknowledged messages, " +
+ "will wait: " + nodeId);
- if (client.close() || client.closed())
- clients.remove(nodeId, client);
+ continue;
}
- }
- Thread.sleep(idleConnTimeout);
+ if (log.isDebugEnabled())
+ log.debug("Closing idle node connection: " + nodeId);
+
+ if (client.close() || client.closed())
+ clients.remove(nodeId, client);
+ }
}
}
@@ -2264,212 +2220,39 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
}
- }
-
- /**
- *
- */
- private class ClientFlushWorker extends IgniteSpiThread {
- /**
- *
- */
- ClientFlushWorker() {
- super(gridName, "nio-client-flusher", log);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"BusyWait"})
- @Override protected void body() throws InterruptedException {
- while (!isInterrupted()) {
- long connBufFlushFreq0 = connBufFlushFreq;
-
- for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
- GridCommunicationClient client = entry.getValue();
-
- if (client.reserve()) {
- boolean err = true;
-
- try {
- client.flushIfNeeded(connBufFlushFreq0);
-
- err = false;
- }
- catch (IOException e) {
- if (getSpiContext().pingNode(entry.getKey()))
- U.error(log, "Failed to flush client: " + client, e);
- else {
- if (log.isDebugEnabled())
- log.debug("Failed to flush client (node left): " + client);
-
- onException("Failed to flush client (node left): " + client, e);
- }
- }
- finally {
- if (err)
- client.forceClose();
- else
- client.release();
- }
- }
- }
-
- Thread.sleep(connBufFlushFreq0);
- }
- }
- }
-
- /**
- * Handles sockets timeouts.
- */
- private class SocketTimeoutWorker extends IgniteSpiThread {
- /** Time-based sorted set for timeout objects. */
- private final GridConcurrentSkipListSet<HandshakeTimeoutObject> timeoutObjs =
- new GridConcurrentSkipListSet<>(new Comparator<HandshakeTimeoutObject>() {
- @Override public int compare(HandshakeTimeoutObject o1, HandshakeTimeoutObject o2) {
- long time1 = o1.endTime();
- long time2 = o2.endTime();
-
- long id1 = o1.id();
- long id2 = o2.id();
-
- return time1 < time2 ? -1 : time1 > time2 ? 1 :
- id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
- }
- });
-
- /** Mutex. */
- private final Object mux0 = new Object();
-
- /**
- *
- */
- SocketTimeoutWorker() {
- super(gridName, "tcp-comm-sock-timeout-worker", log);
- }
-
- /**
- * @param timeoutObj Timeout object to add.
- */
- @SuppressWarnings({"NakedNotify"})
- public void addTimeoutObject(HandshakeTimeoutObject timeoutObj) {
- assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
-
- timeoutObjs.add(timeoutObj);
-
- if (timeoutObjs.firstx() == timeoutObj) {
- synchronized (mux0) {
- mux0.notifyAll();
- }
- }
- }
/**
- * @param timeoutObj Timeout object to remove.
+ * @param recoveryDesc Recovery descriptor.
*/
- public void removeTimeoutObject(HandshakeTimeoutObject timeoutObj) {
- assert timeoutObj != null;
-
- timeoutObjs.remove(timeoutObj);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Socket timeout worker has been started.");
+ private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
+ ClusterNode node = recoveryDesc.node();
- while (!isInterrupted()) {
- long now = U.currentTimeMillis();
-
- for (Iterator<HandshakeTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
- HandshakeTimeoutObject timeoutObj = iter.next();
-
- if (timeoutObj.endTime() <= now) {
- iter.remove();
+ if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+ return;
- timeoutObj.onTimeout();
- }
- else
- break;
- }
-
- synchronized (mux0) {
- while (true) {
- // Access of the first element must be inside of
- // synchronization block, so we don't miss out
- // on thread notification events sent from
- // 'addTimeoutObject(..)' method.
- HandshakeTimeoutObject first = timeoutObjs.firstx();
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
- if (first != null) {
- long waitTime = first.endTime() - U.currentTimeMillis();
+ GridCommunicationClient client = reserveClient(node);
- if (waitTime > 0)
- mux0.wait(waitTime);
- else
- break;
- }
- else
- mux0.wait(5000);
- }
- }
+ client.release();
}
- }
- }
-
- /**
- *
- */
- private class RecoveryWorker extends IgniteSpiThread {
- /** */
- private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
-
- /**
- *
- */
- private RecoveryWorker() {
- super(gridName, "tcp-comm-recovery-worker", log);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Recovery worker has been started.");
-
- while (!isInterrupted()) {
- GridNioRecoveryDescriptor recoveryDesc = q.take();
-
- assert recoveryDesc != null;
-
- ClusterNode node = recoveryDesc.node();
-
- if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
- continue;
-
- try {
+ catch (IgniteCheckedException | IgniteException e) {
+ if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
if (log.isDebugEnabled())
- log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
-
- GridCommunicationClient client = reserveClient(node);
+ log.debug("Recovery reconnect failed, will retry " +
+ "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
- client.release();
+ addReconnectRequest(recoveryDesc);
}
- catch (IgniteCheckedException e) {
- if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect failed, will retry " +
- "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
-
- addReconnectRequest(recoveryDesc);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Recovery reconnect failed, " +
- "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
-
- onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
- e);
- }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Recovery reconnect failed, " +
+ "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+ onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
+ e);
}
}
}
@@ -2497,12 +2280,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
*
*/
- private static class HandshakeTimeoutObject<T> {
- /** */
- private static final AtomicLong idGen = new AtomicLong();
-
+ private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject {
/** */
- private final long id = idGen.incrementAndGet();
+ private final IgniteUuid id = IgniteUuid.randomUuid();
/** */
private final T obj;
@@ -2533,34 +2313,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return done.compareAndSet(false, true);
}
- /**
- * @return {@code True} if object has not yet been canceled.
- */
- boolean onTimeout() {
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
if (done.compareAndSet(false, true)) {
// Close socket - timeout occurred.
if (obj instanceof GridCommunicationClient)
((GridCommunicationClient)obj).forceClose();
else
U.closeQuiet((AbstractInterruptibleChannel)obj);
-
- return true;
}
-
- return false;
}
- /**
- * @return End time.
- */
- long endTime() {
+ /** {@inheritDoc} */
+ @Override public long endTime() {
return endTime;
}
- /**
- * @return ID.
- */
- long id() {
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
return id;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 5c80e6e..6f5a738 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -171,8 +171,6 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
* This frequency defines how often system will advice to flush
* connection buffer.
* <p>
- * If not provided, default value is {@link TcpCommunicationSpi#DFLT_CONN_BUF_FLUSH_FREQ}.
- * <p>
* This property is used only if {@link #getConnectionBufferSize()} is greater than {@code 0}.
*
* @param connBufFlushFreq Flush frequency.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 7560999..b952087 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -17,12 +17,13 @@
package org.apache.ignite.spi.discovery;
+import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.*;
import org.jetbrains.annotations.*;
-import java.io.*;
import java.util.*;
/**
@@ -102,7 +103,7 @@ public interface DiscoverySpi extends IgniteSpi {
*
* @param exchange Discovery data exchange handler.
*/
- public void setDataExchange(DiscoverySpiDataExchange exchange);
+ public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange);
/**
* Sets discovery metrics provider. Use metrics provided by
@@ -111,7 +112,7 @@ public interface DiscoverySpi extends IgniteSpi {
*
* @param metricsProvider Provider of metrics data.
*/
- public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider);
+ public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider);
/**
* Tells discovery SPI to disconnect from topology. This is very close to calling
@@ -141,9 +142,10 @@ public interface DiscoverySpi extends IgniteSpi {
/**
* Sends custom message across the ring.
- * @param evt Event.
+ * @param msg Custom message.
+ * @throws IgniteException if failed to marshal evt.
*/
- public void sendCustomEvent(Serializable evt);
+ public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException;
/**
* Initiates failure of provided node.
@@ -151,4 +153,12 @@ public interface DiscoverySpi extends IgniteSpi {
* @param nodeId Node ID.
*/
public void failNode(UUID nodeId);
+
+ /**
+ * Whether or not discovery is started in client mode.
+ *
+ * @return {@code true} if node is in client mode.
+ * @throws IllegalStateException If discovery SPI has not started.
+ */
+ public boolean isClientMode() throws IllegalStateException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
new file mode 100644
index 0000000..15e943b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Message to send across ring.
+ *
+ * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent(
+ * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage)
+ */
+public interface DiscoverySpiCustomMessage extends Serializable {
+ /**
+ * Called when message passed the ring.
+ */
+ @Nullable public DiscoverySpiCustomMessage ackMessage();
+
+ /**
+ * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
+ */
+ public boolean isMutable();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
index 7f17fe4..f46869d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
@@ -18,10 +18,9 @@
package org.apache.ignite.spi.discovery;
import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.*;
import org.jetbrains.annotations.*;
-import java.io.*;
import java.util.*;
/**
@@ -47,5 +46,5 @@ public interface DiscoverySpiListener {
ClusterNode node,
Collection<ClusterNode> topSnapshot,
@Nullable Map<Long, Collection<ClusterNode>> topHist,
- @Nullable Serializable data);
+ @Nullable DiscoverySpiCustomMessage data);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
new file mode 100644
index 0000000..d064c8d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -0,0 +1,1478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
+
+/**
+ *
+ */
+class ClientImpl extends TcpDiscoveryImpl {
+ /** */
+ private static final Object JOIN_TIMEOUT = "JOIN_TIMEOUT";
+
+ /** */
+ private static final Object SPI_STOP = "SPI_STOP";
+
+ /** */
+ private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
+
+ /** Remote nodes. */
+ private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
+
+ /** Topology history. */
+ private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+
+ /** Remote nodes. */
+ private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>();
+
+ /** Socket writer. */
+ private SocketWriter sockWriter;
+
+ /** */
+ private SocketReader sockReader;
+
+ /** */
+ private boolean segmented;
+
+ /** Last message ID. */
+ private volatile IgniteUuid lastMsgId;
+
+ /** Current topology version. */
+ private volatile long topVer;
+
+ /** Join error. Contains error what occurs on join process. */
+ private IgniteSpiException joinErr;
+
+ /** Joined latch. */
+ private final CountDownLatch joinLatch = new CountDownLatch(1);
+
+ /** Left latch. */
+ private final CountDownLatch leaveLatch = new CountDownLatch(1);
+
+ /** */
+ private final Timer timer = new Timer("TcpDiscoverySpi.timer");
+
+ /** */
+ protected MessageWorker msgWorker;
+
+ /**
+ * @param adapter Adapter.
+ */
+ ClientImpl(TcpDiscoverySpi adapter) {
+ super(adapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dumpDebugInfo(IgniteLogger log) {
+ StringBuilder b = new StringBuilder(U.nl());
+
+ b.append(">>>").append(U.nl());
+ b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl());
+ b.append(">>>").append(U.nl());
+
+ b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl());
+ b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl());
+
+ b.append("Internal threads: ").append(U.nl());
+
+ b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
+ b.append(" Socket reader: ").append(threadStatus(sockReader)).append(U.nl());
+ b.append(" Socket writer: ").append(threadStatus(sockWriter)).append(U.nl());
+
+ b.append(U.nl());
+
+ b.append("Nodes: ").append(U.nl());
+
+ for (ClusterNode node : allVisibleNodes())
+ b.append(" ").append(node.id()).append(U.nl());
+
+ b.append(U.nl());
+
+ b.append("Stats: ").append(spi.stats).append(U.nl());
+
+ U.quietAndInfo(log, b.toString());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSpiState() {
+
+ if (sockWriter.isOnline())
+ return "connected";
+
+ return "disconnected";
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMessageWorkerQueueSize() {
+ return msgWorker.queueSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID getCoordinator() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+ spi.initLocalNode(0, true);
+
+ locNode = spi.locNode;
+
+ sockWriter = new SocketWriter();
+ sockWriter.start();
+
+ sockReader = new SocketReader();
+ sockReader.start();
+
+ msgWorker = new MessageWorker();
+ msgWorker.start();
+
+ if (spi.ipFinder.isShared())
+ registerLocalNodeAddress();
+
+ try {
+ joinLatch.await();
+
+ if (joinErr != null)
+ throw joinErr;
+ }
+ catch (InterruptedException e) {
+ throw new IgniteSpiException("Thread has been interrupted.", e);
+ }
+
+ timer.schedule(new HeartbeatSender(), spi.hbFreq, spi.hbFreq);
+
+ spi.printStartInfo();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStop() throws IgniteSpiException {
+ timer.cancel();
+
+ if (msgWorker != null && msgWorker.isAlive()) { // Should always be alive
+ msgWorker.addMessage(SPI_STOP);
+
+ try {
+ if (!leaveLatch.await(spi.netTimeout, MILLISECONDS))
+ U.warn(log, "Failed to left node: timeout [nodeId=" + locNode + ']');
+ }
+ catch (InterruptedException ignored) {
+
+ }
+ }
+
+ for (GridFutureAdapter<Boolean> fut : pingFuts.values())
+ fut.onDone(false);
+
+ rmtNodes.clear();
+
+ U.interrupt(msgWorker);
+ U.interrupt(sockWriter);
+ U.interrupt(sockReader);
+
+ U.join(msgWorker, log);
+ U.join(sockWriter, log);
+ U.join(sockReader, log);
+
+ spi.printStopInfo();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> getRemoteNodes() {
+ return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+ if (getLocalNodeId().equals(nodeId))
+ return locNode;
+
+ TcpDiscoveryNode node = rmtNodes.get(nodeId);
+
+ return node != null && node.visible() ? node : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean pingNode(@NotNull final UUID nodeId) {
+ if (nodeId.equals(getLocalNodeId()))
+ return true;
+
+ TcpDiscoveryNode node = rmtNodes.get(nodeId);
+
+ if (node == null || !node.visible())
+ return false;
+
+ GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId);
+
+ if (fut == null) {
+ fut = new GridFutureAdapter<>();
+
+ GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut);
+
+ if (oldFut != null)
+ fut = oldFut;
+ else {
+ if (spi.getSpiContext().isStopping()) {
+ if (pingFuts.remove(nodeId, fut))
+ fut.onDone(false);
+
+ return false;
+ }
+
+ final GridFutureAdapter<Boolean> finalFut = fut;
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (pingFuts.remove(nodeId, finalFut))
+ finalFut.onDone(false);
+ }
+ }, spi.netTimeout);
+
+ sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+ }
+ }
+
+ try {
+ return fut.get();
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ return false;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException(e); // Should newer occur
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void disconnect() throws IgniteSpiException {
+ U.interrupt(msgWorker);
+ U.interrupt(sockWriter);
+ U.interrupt(sockReader);
+
+ U.join(msgWorker, log);
+ U.join(sockWriter, log);
+ U.join(sockReader, log);
+
+ leaveLatch.countDown();
+ joinLatch.countDown();
+
+ spi.getSpiContext().deregisterPorts();
+
+ Collection<ClusterNode> rmts = getRemoteNodes();
+
+ // This is restart/disconnection and remote nodes are not empty.
+ // We need to fire FAIL event for each.
+ DiscoverySpiListener lsnr = spi.lsnr;
+
+ if (lsnr != null) {
+ for (ClusterNode n : rmts) {
+ rmtNodes.remove(n.id());
+
+ Collection<ClusterNode> top = updateTopologyHistory(topVer + 1);
+
+ lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null);
+ }
+ }
+
+ rmtNodes.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
+ if (segmented)
+ throw new IgniteException("Failed to send custom message: client is disconnected");
+
+ try {
+ sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
+ spi.marsh.marshal(evt)));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void failNode(UUID nodeId) {
+ ClusterNode node = rmtNodes.get(nodeId);
+
+ if (node != null) {
+ TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
+ node.id(), node.order());
+
+ msgWorker.addMessage(msg);
+ }
+ }
+
+ /**
+ * @return Opened socket or {@code null} if timeout.
+ * @see TcpDiscoverySpi#joinTimeout
+ */
+ @SuppressWarnings("BusyWait")
+ @Nullable private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException {
+ Collection<InetSocketAddress> addrs = null;
+
+ long startTime = U.currentTimeMillis();
+
+ while (true) {
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException();
+
+ while (addrs == null || addrs.isEmpty()) {
+ addrs = spi.resolvedAddresses();
+
+ if (!F.isEmpty(addrs)) {
+ if (log.isDebugEnabled())
+ log.debug("Resolved addresses from IP finder: " + addrs);
+ }
+ else {
+ U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder);
+
+ if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout)
+ return null;
+
+ Thread.sleep(2000);
+ }
+ }
+
+ Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
+
+ Iterator<InetSocketAddress> it = addrs.iterator();
+
+ while (it.hasNext()) {
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException();
+
+ InetSocketAddress addr = it.next();
+
+ Socket sock = null;
+
+ try {
+ long ts = U.currentTimeMillis();
+
+ IgniteBiTuple<Socket, UUID> t = initConnection(addr);
+
+ sock = t.get1();
+
+ UUID rmtNodeId = t.get2();
+
+ spi.stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
+
+ locNode.clientRouterNodeId(rmtNodeId);
+
+ TcpDiscoveryAbstractMessage msg = recon ?
+ new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId,
+ lastMsgId) :
+ new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId()));
+
+ msg.client(true);
+
+ spi.writeToSocket(sock, msg);
+
+ int res = spi.readReceipt(sock, spi.ackTimeout);
+
+ switch (res) {
+ case RES_OK:
+ return sock;
+
+ case RES_CONTINUE_JOIN:
+ case RES_WAIT:
+ U.closeQuiet(sock);
+
+ break;
+
+ default:
+ if (log.isDebugEnabled())
+ log.debug("Received unexpected response to join request: " + res);
+
+ U.closeQuiet(sock);
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to establish connection with address: " + addr, e);
+
+ U.closeQuiet(sock);
+
+ it.remove();
+ }
+ }
+
+ if (addrs.isEmpty()) {
+ U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
+ "in 2000ms): " + addrs0);
+
+ if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout)
+ return null;
+
+ Thread.sleep(2000);
+ }
+ }
+ }
+
+ /**
+ * @param topVer New topology version.
+ * @return Latest topology snapshot.
+ */
+ private NavigableSet<ClusterNode> updateTopologyHistory(long topVer) {
+ this.topVer = topVer;
+
+ NavigableSet<ClusterNode> allNodes = allVisibleNodes();
+
+ if (!topHist.containsKey(topVer)) {
+ assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
+ "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
+
+ topHist.put(topVer, allNodes);
+
+ if (topHist.size() > spi.topHistSize)
+ topHist.pollFirstEntry();
+
+ assert topHist.lastKey() == topVer;
+ assert topHist.size() <= spi.topHistSize;
+ }
+
+ return allNodes;
+ }
+
+ /**
+ * @return All nodes.
+ */
+ private NavigableSet<ClusterNode> allVisibleNodes() {
+ NavigableSet<ClusterNode> allNodes = new TreeSet<>();
+
+ for (TcpDiscoveryNode node : rmtNodes.values()) {
+ if (node.visible())
+ allNodes.add(node);
+ }
+
+ allNodes.add(locNode);
+
+ return allNodes;
+ }
+
+ /**
+ * @param addr Address.
+ * @return Remote node ID.
+ * @throws IOException In case of I/O error.
+ * @throws IgniteCheckedException In case of other error.
+ */
+ private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, IgniteCheckedException {
+ assert addr != null;
+
+ Socket sock = spi.openSocket(addr);
+
+ TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(getLocalNodeId());
+
+ req.client(true);
+
+ spi.writeToSocket(sock, req);
+
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, spi.ackTimeout);
+
+ UUID nodeId = res.creatorNodeId();
+
+ assert nodeId != null;
+ assert !getLocalNodeId().equals(nodeId);
+
+ return F.t(sock, nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override void simulateNodeFailure() {
+ U.warn(log, "Simulating client node failure: " + getLocalNodeId());
+
+ U.interrupt(sockWriter);
+ U.interrupt(msgWorker);
+
+ U.join(sockWriter, log);
+ U.join(msgWorker, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void brakeConnection() {
+ U.closeQuiet(msgWorker.currSock);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteSpiThread workerThread() {
+ return msgWorker;
+ }
+
+ /**
+ * FOR TEST PURPOSE ONLY!
+ */
+ @SuppressWarnings("BusyWait")
+ public void waitForClientMessagePrecessed() {
+ Object last = msgWorker.queue.peekLast();
+
+ while (last != null && msgWorker.isAlive() && msgWorker.queue.contains(last)) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Heartbeat sender.
+ */
+ private class HeartbeatSender extends TimerTask {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ if (!spi.getSpiContext().isStopping() && sockWriter.isOnline()) {
+ TcpDiscoveryClientHeartbeatMessage msg = new TcpDiscoveryClientHeartbeatMessage(getLocalNodeId(),
+ spi.metricsProvider.metrics());
+
+ msg.client(true);
+
+ sockWriter.sendMessage(msg);
+ }
+ }
+ }
+
+ /**
+ * Socket reader.
+ */
+ private class SocketReader extends IgniteSpiThread {
+ /** */
+ private final Object mux = new Object();
+
+ /** */
+ private Socket sock;
+
+ /** */
+ private UUID rmtNodeId;
+
+ /**
+ */
+ protected SocketReader() {
+ super(spi.ignite().name(), "tcp-client-disco-sock-reader", log);
+ }
+
+ /**
+ * @param sock Socket.
+ * @param rmtNodeId Rmt node id.
+ */
+ public void setSocket(Socket sock, UUID rmtNodeId) {
+ synchronized (mux) {
+ this.sock = sock;
+
+ this.rmtNodeId = rmtNodeId;
+
+ mux.notifyAll();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ while (!isInterrupted()) {
+ Socket sock;
+ UUID rmtNodeId;
+
+ synchronized (mux) {
+ if (this.sock == null) {
+ mux.wait();
+
+ continue;
+ }
+
+ sock = this.sock;
+ rmtNodeId = this.rmtNodeId;
+ }
+
+ try {
+ InputStream in = new BufferedInputStream(sock.getInputStream());
+
+ sock.setKeepAlive(true);
+ sock.setTcpNoDelay(true);
+
+ while (!isInterrupted()) {
+ TcpDiscoveryAbstractMessage msg;
+
+ try {
+ msg = spi.marsh.unmarshal(in, U.gridClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to read message [sock=" + sock + ", " +
+ "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e);
+
+ IOException ioEx = X.cause(e, IOException.class);
+
+ if (ioEx != null)
+ throw ioEx;
+
+ ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);
+
+ if (clsNotFoundEx != null)
+ LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
+ "(make sure same versions of all classes are available on all nodes) " +
+ "[rmtNodeId=" + rmtNodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
+ else
+ LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" +
+ getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']');
+
+ continue;
+ }
+
+ msg.senderNodeId(rmtNodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Message has been received: " + msg);
+
+ spi.stats.onMessageReceived(msg);
+
+ if (spi.ensured(msg))
+ lastMsgId = msg.id();
+
+ msgWorker.addMessage(msg);
+ }
+ }
+ catch (IOException e) {
+ msgWorker.addMessage(new SocketClosedMessage(sock));
+
+ if (log.isDebugEnabled())
+ U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e);
+ }
+ finally {
+ U.closeQuiet(sock);
+
+ synchronized (mux) {
+ if (this.sock == sock) {
+ this.sock = null;
+ this.rmtNodeId = null;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private class SocketWriter extends IgniteSpiThread {
+ /** */
+ private final Object mux = new Object();
+
+ /** */
+ private Socket sock;
+
+ /** */
+ private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
+
+ /**
+ *
+ */
+ protected SocketWriter() {
+ super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void sendMessage(TcpDiscoveryAbstractMessage msg) {
+ synchronized (mux) {
+ queue.add(msg);
+
+ mux.notifyAll();
+ }
+ }
+
+ /**
+ * @param sock Socket.
+ */
+ private void setSocket(Socket sock) {
+ synchronized (mux) {
+ this.sock = sock;
+
+ mux.notifyAll();
+ }
+ }
+
+ /**
+ *
+ */
+ public boolean isOnline() {
+ synchronized (mux) {
+ return sock != null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ TcpDiscoveryAbstractMessage msg = null;
+
+ while (!Thread.currentThread().isInterrupted()) {
+ Socket sock;
+
+ synchronized (mux) {
+ sock = this.sock;
+
+ if (sock == null) {
+ mux.wait();
+
+ continue;
+ }
+
+ if (msg == null)
+ msg = queue.poll();
+
+ if (msg == null) {
+ mux.wait();
+
+ continue;
+ }
+ }
+
+ for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs)
+ msgLsnr.apply(msg);
+
+ try {
+ spi.writeToSocket(sock, msg);
+
+ msg = null;
+ }
+ catch (IOException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock + ']', e);
+
+ U.closeQuiet(sock);
+
+ synchronized (mux) {
+ if (sock == this.sock)
+ this.sock = null; // Connection has dead.
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send message: " + msg, e);
+
+ msg = null;
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private class Reconnector extends IgniteSpiThread {
+ /** */
+ private volatile Socket sock;
+
+ /**
+ *
+ */
+ protected Reconnector() {
+ super(spi.ignite().name(), "tcp-client-disco-msg-worker", log);
+ }
+
+ /**
+ *
+ */
+ public void cancel() {
+ interrupt();
+
+ U.closeQuiet(sock);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ assert !segmented;
+
+ boolean success = false;
+
+ try {
+ sock = joinTopology(true);
+
+ if (sock == null) {
+ U.error(log, "Failed to reconnect to cluster: timeout.");
+
+ return;
+ }
+
+ if (isInterrupted())
+ throw new InterruptedException();
+
+ InputStream in = new BufferedInputStream(sock.getInputStream());
+
+ sock.setKeepAlive(true);
+ sock.setTcpNoDelay(true);
+
+ // Wait for
+ while (!isInterrupted()) {
+ TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
+
+ if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+ TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
+
+ if (res.creatorNodeId().equals(getLocalNodeId())) {
+ if (res.success()) {
+ msgWorker.addMessage(res);
+
+ success = true;
+ }
+
+ break;
+ }
+ }
+
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ U.error(log, "Failed to reconnect", e);
+ }
+ finally {
+ if (!success) {
+ U.closeQuiet(sock);
+
+ msgWorker.addMessage(SPI_RECONNECT_FAILED);
+ }
+ }
+ }
+ }
+
+ /**
+ * Message worker.
+ */
+ protected class MessageWorker extends IgniteSpiThread {
+ /** Message queue. */
+ private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
+
+ /** */
+ private Socket currSock;
+
+ /** Indicates that pending messages are currently processed. */
+ private boolean pending;
+
+ /** */
+ private Reconnector reconnector;
+
+ /**
+ *
+ */
+ private MessageWorker() {
+ super(spi.ignite().name(), "tcp-client-disco-msg-worker", log);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("InfiniteLoopStatement")
+ @Override protected void body() throws InterruptedException {
+ spi.stats.onJoinStarted();
+
+ try {
+ final Socket sock = joinTopology(false);
+
+ if (sock == null) {
+ joinErr = new IgniteSpiException("Join process timed out");
+
+ joinLatch.countDown();
+
+ return;
+ }
+
+ currSock = sock;
+
+ sockWriter.setSocket(sock);
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (joinLatch.getCount() > 0)
+ queue.add(JOIN_TIMEOUT);
+ }
+ }, spi.netTimeout);
+
+ sockReader.setSocket(sock, locNode.clientRouterNodeId());
+
+ while (true) {
+ Object msg = queue.take();
+
+ if (msg == JOIN_TIMEOUT) {
+ if (joinLatch.getCount() > 0) {
+ joinErr = new IgniteSpiException("Join process timed out [sock=" + sock +
+ ", timeout=" + spi.netTimeout + ']');
+
+ joinLatch.countDown();
+
+ break;
+ }
+ }
+ else if (msg == SPI_STOP) {
+ assert spi.getSpiContext().isStopping();
+
+ if (currSock != null) {
+ TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
+
+ leftMsg.client(true);
+
+ sockWriter.sendMessage(leftMsg);
+ }
+ else
+ leaveLatch.countDown();
+ }
+ else if (msg instanceof SocketClosedMessage) {
+ if (((SocketClosedMessage)msg).sock == currSock) {
+ currSock = null;
+
+ if (joinLatch.getCount() > 0) {
+ joinErr = new IgniteSpiException("Failed to connect to cluster: socket closed.");
+
+ joinLatch.countDown();
+
+ break;
+ }
+ else {
+ if (spi.getSpiContext().isStopping() || segmented)
+ leaveLatch.countDown();
+ else {
+ assert reconnector == null;
+
+ final Reconnector reconnector = new Reconnector();
+ this.reconnector = reconnector;
+ reconnector.start();
+
+ timer.schedule(new TimerTask() {
+ @Override public void run() {
+ if (reconnector.isAlive())
+ reconnector.cancel();
+ }
+ }, spi.netTimeout);
+ }
+ }
+ }
+ }
+ else if (msg == SPI_RECONNECT_FAILED) {
+ if (!segmented) {
+ segmented = true;
+
+ reconnector.cancel();
+ reconnector.join();
+
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+ }
+ }
+ else {
+ TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
+
+ if (joinLatch.getCount() > 0) {
+ IgniteSpiException err = null;
+
+ if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage)
+ err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
+ else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage)
+ err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
+ else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage)
+ err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
+
+ if (err != null) {
+ joinErr = err;
+
+ joinLatch.countDown();
+
+ break;
+ }
+ }
+
+ processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg);
+ }
+ }
+ }
+ finally {
+ U.closeQuiet(currSock);
+
+ if (joinLatch.getCount() > 0) {
+ // This should not occurs.
+ joinErr = new IgniteSpiException("Some error occurs in joinig process");
+
+ joinLatch.countDown();
+ }
+
+ if (reconnector != null) {
+ reconnector.cancel();
+
+ reconnector.join();
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) {
+ assert msg != null;
+ assert msg.verified() || msg.senderNodeId() == null;
+
+ spi.stats.onMessageProcessingStarted(msg);
+
+ if (msg instanceof TcpDiscoveryNodeAddedMessage)
+ processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+ processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
+ else if (msg instanceof TcpDiscoveryNodeLeftMessage)
+ processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
+ else if (msg instanceof TcpDiscoveryNodeFailedMessage)
+ processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
+ else if (msg instanceof TcpDiscoveryHeartbeatMessage)
+ processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+ else if (msg instanceof TcpDiscoveryClientReconnectMessage)
+ processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+ else if (msg instanceof TcpDiscoveryCustomEventMessage)
+ processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+ else if (msg instanceof TcpDiscoveryClientPingResponse)
+ processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
+ else if (msg instanceof TcpDiscoveryPingRequest)
+ processPingRequest();
+
+ spi.stats.onMessageProcessingFinished(msg);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
+ if (spi.getSpiContext().isStopping())
+ return;
+
+ TcpDiscoveryNode node = msg.node();
+
+ UUID newNodeId = node.id();
+
+ if (getLocalNodeId().equals(newNodeId)) {
+ if (joinLatch.getCount() > 0) {
+ Collection<TcpDiscoveryNode> top = msg.topology();
+
+ if (top != null) {
+ spi.gridStartTime = msg.gridStartTime();
+
+ for (TcpDiscoveryNode n : top) {
+ if (n.order() > 0)
+ n.visible(true);
+
+ rmtNodes.put(n.id(), n);
+ }
+
+ topHist.clear();
+
+ if (msg.topologyHistory() != null)
+ topHist.putAll(msg.topologyHistory());
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding node added message with empty topology: " + msg);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding node added message (this message has already been processed) " +
+ "[msg=" + msg + ", locNode=" + locNode + ']');
+ }
+ else {
+ boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
+
+ if (topChanged) {
+ if (log.isDebugEnabled())
+ log.debug("Added new node to topology: " + node);
+
+ Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+
+ if (data != null)
+ spi.onExchange(newNodeId, newNodeId, data, null);
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+ if (spi.getSpiContext().isStopping())
+ return;
+
+ if (getLocalNodeId().equals(msg.nodeId())) {
+ if (joinLatch.getCount() > 0) {
+ Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
+
+ if (dataMap != null) {
+ for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
+ spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(), null);
+ }
+
+ locNode.setAttributes(msg.clientNodeAttributes());
+ locNode.visible(true);
+
+ long topVer = msg.topologyVersion();
+
+ locNode.order(topVer);
+
+ notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer));
+
+ joinErr = null;
+
+ joinLatch.countDown();
+
+ spi.stats.onJoinFinished();
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message (this message has already been processed) " +
+ "[msg=" + msg + ", locNode=" + locNode + ']');
+ }
+ else {
+ TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');
+
+ return;
+ }
+
+ long topVer = msg.topologyVersion();
+
+ node.order(topVer);
+ node.visible(true);
+
+ if (spi.locNodeVer.equals(node.version()))
+ node.version(spi.locNodeVer);
+
+ NavigableSet<ClusterNode> top = updateTopologyHistory(topVer);
+
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message (join process is not finished): " + msg);
+
+ return;
+ }
+
+ notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
+
+ spi.stats.onNodeJoined();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
+ if (getLocalNodeId().equals(msg.creatorNodeId())) {
+ if (log.isDebugEnabled())
+ log.debug("Received node left message for local node: " + msg);
+
+ leaveLatch.countDown();
+ }
+ else {
+ if (spi.getSpiContext().isStopping())
+ return;
+
+ TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node left message since node is not found [msg=" + msg + ']');
+
+ return;
+ }
+
+ NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node left message (join process is not finished): " + msg);
+
+ return;
+ }
+
+ notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
+
+ spi.stats.onNodeLeft();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
+ if (spi.getSpiContext().isStopping()) {
+ if (!getLocalNodeId().equals(msg.creatorNodeId()) && getLocalNodeId().equals(msg.failedNodeId())) {
+ if (leaveLatch.getCount() > 0) {
+ log.debug("Remote node fail this node while node is stopping [locNode=" + getLocalNodeId()
+ + ", rmtNode=" + msg.creatorNodeId() + ']');
+
+ leaveLatch.countDown();
+ }
+ }
+
+ return;
+ }
+
+ if (!getLocalNodeId().equals(msg.creatorNodeId())) {
+ TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');
+
+ return;
+ }
+
+ NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node failed message (join process is not finished): " + msg);
+
+ return;
+ }
+
+ notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
+
+ spi.stats.onNodeFailed();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
+ if (spi.getSpiContext().isStopping())
+ return;
+
+ if (getLocalNodeId().equals(msg.creatorNodeId())) {
+ assert msg.senderNodeId() != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Received heartbeat response: " + msg);
+ }
+ else {
+ long tstamp = U.currentTimeMillis();
+
+ if (msg.hasMetrics()) {
+ for (Map.Entry<UUID, TcpDiscoveryHeartbeatMessage.MetricsSet> e : msg.metrics().entrySet()) {
+ UUID nodeId = e.getKey();
+
+ TcpDiscoveryHeartbeatMessage.MetricsSet metricsSet = e.getValue();
+
+ Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ?
+ msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
+
+ updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp);
+
+ for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
+ updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+ if (spi.getSpiContext().isStopping())
+ return;
+
+ if (getLocalNodeId().equals(msg.creatorNodeId())) {
+ assert msg.success();
+
+ currSock = reconnector.sock;
+
+ sockWriter.setSocket(currSock);
+ sockReader.setSocket(currSock, locNode.clientRouterNodeId());
+
+ reconnector = null;
+
+ pending = true;
+
+ try {
+ for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
+ processDiscoveryMessage(pendingMsg);
+ }
+ finally {
+ pending = false;
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding reconnect message for another client: " + msg);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+ if (msg.verified() && joinLatch.getCount() == 0) {
+ DiscoverySpiListener lsnr = spi.lsnr;
+
+ if (lsnr != null) {
+ UUID nodeId = msg.creatorNodeId();
+
+ TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
+
+ if (node != null && node.visible()) {
+ try {
+ DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh);
+
+ notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to unmarshal discovery custom message.", e);
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Received metrics from unknown node: " + nodeId);
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) {
+ GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing());
+
+ if (fut != null)
+ fut.onDone(msg.result());
+ }
+
+ /**
+ * Router want to ping this client.
+ */
+ private void processPingRequest() {
+ TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(getLocalNodeId());
+
+ res.client(true);
+
+ sockWriter.sendMessage(res);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param metrics Metrics.
+ * @param cacheMetrics Cache metrics.
+ * @param tstamp Timestamp.
+ */
+ private void updateMetrics(UUID nodeId,
+ ClusterMetrics metrics,
+ Map<Integer, CacheMetrics> cacheMetrics,
+ long tstamp)
+ {
+ assert nodeId != null;
+ assert metrics != null;
+ assert cacheMetrics != null;
+
+ TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
+
+ if (node != null && node.visible()) {
+ node.setMetrics(metrics);
+ node.setCacheMetrics(cacheMetrics);
+
+ node.lastUpdateTime(tstamp);
+
+ notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes());
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Received metrics from unknown node: " + nodeId);
+ }
+
+ /**
+ * @param type Event type.
+ * @param topVer Topology version.
+ * @param node Node.
+ * @param top Topology snapshot.
+ */
+ private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top) {
+ notifyDiscovery(type, topVer, node, top, null);
+ }
+
+ /**
+ * @param type Event type.
+ * @param topVer Topology version.
+ * @param node Node.
+ * @param top Topology snapshot.
+ */
+ private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top,
+ @Nullable DiscoverySpiCustomMessage data) {
+ DiscoverySpiListener lsnr = spi.lsnr;
+
+ if (lsnr != null) {
+ if (log.isDebugEnabled())
+ log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
+ ", topVer=" + topVer + ']');
+
+ lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), data);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
+ ", topVer=" + topVer + ']');
+ }
+
+ /**
+ * @param msg Message.
+ */
+ public void addMessage(Object msg) {
+ queue.add(msg);
+ }
+
+ /**
+ *
+ */
+ public int queueSize() {
+ return queue.size();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class SocketClosedMessage {
+ /** */
+ private final Socket sock;
+
+ /**
+ * @param sock Socket.
+ */
+ private SocketClosedMessage(Socket sock) {
+ this.sock = sock;
+ }
+ }
+}