You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 15:07:39 UTC
[38/52] [abbrv] incubator-ignite git commit: # Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
new file mode 100644
index 0000000..955a305
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
@@ -0,0 +1,36 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.util.*;
+
+/**
+ * Handler for initial data exchange between GridGain nodes. Data exchange
+ * is initiated by a new node when it tries to join topology and finishes
+ * before it actually joins.
+ */
+public interface DiscoverySpiDataExchange {
+ /**
+ * Collects data from all components. This method is called both
+ * on new node that joins topology to transfer its data to existing
+ * nodes and on all existing nodes to transfer their data to new node.
+ *
+ * @param nodeId ID of new node that joins topology.
+ * @return Collection of discovery data objects from different components.
+ */
+ public List<Object> collect(UUID nodeId);
+
+ /**
+ * Notifies discovery manager about data received from remote node.
+ *
+ * @param data Collection of discovery data objects from different components.
+ */
+ public void onExchange(List<Object> data);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java
new file mode 100644
index 0000000..cd6ecf9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiHistorySupport.java
@@ -0,0 +1,28 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.lang.annotation.*;
+
+/**
+ * This annotation is for all implementations of {@link DiscoverySpi} that support
+ * topology snapshots history.
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface DiscoverySpiHistorySupport {
+ /**
+ * Whether or not target SPI supports topology snapshots history.
+ */
+ @SuppressWarnings({"JavaDoc"})
+ public boolean value();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
new file mode 100644
index 0000000..acc0ee1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
@@ -0,0 +1,35 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import org.apache.ignite.cluster.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Listener for grid node discovery events. See
+ * {@link DiscoverySpi} for information on how grid nodes get discovered.
+ */
+public interface DiscoverySpiListener {
+ /**
+ * Notification for grid node discovery events.
+ *
+ * @param type Node discovery event type. See {@link org.apache.ignite.events.IgniteDiscoveryEvent}
+ * @param topVer Topology version or {@code 0} if configured discovery SPI implementation
+ * does not support versioning.
+ * @param node Node affected (e.g. newly joined node, left node, failed node or local node).
+ * @param topSnapshot Topology snapshot after event has been occurred (e.g. if event is
+ * {@code EVT_NODE_JOINED}, then joined node will be in snapshot).
+ * @param topHist Topology snapshots history.
+ */
+ public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot,
+ @Nullable Map<Long, Collection<ClusterNode>> topHist);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java
new file mode 100644
index 0000000..dd9fc3a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiNodeAuthenticator.java
@@ -0,0 +1,39 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.managers.security.*;
+import org.gridgain.grid.security.*;
+
+/**
+ * Node authenticator.
+ */
+public interface DiscoverySpiNodeAuthenticator {
+ /**
+ * Security credentials.
+ *
+ * @param node Node to authenticate.
+ * @param cred Security credentials.
+ * @return Security context if authentication succeeded or {@code null} if authentication failed.
+ * @throws GridException If authentication process failed
+ * (invalid credentials should not lead to this exception).
+ */
+ public GridSecurityContext authenticateNode(ClusterNode node, GridSecurityCredentials cred) throws GridException;
+
+ /**
+ * Gets global node authentication flag.
+ *
+ * @return {@code True} if all nodes in topology should authenticate joining node, {@code false} if only
+ * coordinator should do the authentication.
+ */
+ public boolean isGlobalNodeAuthentication();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java
new file mode 100644
index 0000000..aee8b3f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiOrderSupport.java
@@ -0,0 +1,39 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import java.lang.annotation.*;
+
+/**
+ * This annotation is for all implementations of {@link DiscoverySpi} that support
+ * proper node ordering. This includes:
+ * <ul>
+ * <li>
+ * Every node gets an order number assigned to it which is provided via {@link org.apache.ignite.cluster.ClusterNode#order()}
+ * method. There is no requirement about order value other than that nodes that join grid
+ * at later point of time have order values greater than previous nodes.
+ * </li>
+ * <li>
+ * All {@link org.apache.ignite.events.IgniteEventType#EVT_NODE_JOINED} events come in proper order. This means that all
+ * listeners to discovery events will receive discovery notifications in proper order.
+ * </li>
+ * </ul>
+ */
+@Documented
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface DiscoverySpiOrderSupport {
+ /**
+ * Whether or not target SPI supports node startup order.
+ */
+ @SuppressWarnings({"JavaDoc"})
+ public boolean value();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html b/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html
new file mode 100644
index 0000000..77f45f1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/package.html
@@ -0,0 +1,15 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<!--
+ @html.file.header
+ _________ _____ __________________ _____
+ __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+-->
+<html>
+<body>
+ <!-- Package description. -->
+ Contains APIs for topology manager SPI.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
new file mode 100644
index 0000000..b474852
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -0,0 +1,1219 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*;
+
+/**
+ * Client discovery SPI implementation that uses TCP/IP for node discovery.
+ * <p>
+ * This discovery SPI requires at least on server node configured with
+ * {@link TcpDiscoverySpi}. It will try to connect to random IP taken from
+ * {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} which should point to one of these server
+ * nodes and will maintain connection only with this node (will not enter the ring).
+ * If this connection is broken, it will try to reconnect using addresses from
+ * the same IP finder.
+ */
+@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+@IgniteSpiMultipleInstancesSupport(true)
+@DiscoverySpiOrderSupport(true)
+@DiscoverySpiHistorySupport(true)
+public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpClientDiscoverySpiMBean {
+ /** Default disconnect check interval. */
+ public static final long DFLT_DISCONNECT_CHECK_INT = 2000;
+
+ /** Remote nodes. */
+ private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
+
+ /** Socket. */
+ private volatile Socket sock;
+
+ /** Socket reader. */
+ private volatile SocketReader sockRdr;
+
+ /** Heartbeat sender. */
+ private volatile HeartbeatSender hbSender;
+
+ /** Disconnect handler. */
+ private volatile DisconnectHandler disconnectHnd;
+
+ /** Last message ID. */
+ private volatile IgniteUuid lastMsgId;
+
+ /** Current topology version. */
+ private volatile long topVer;
+
+ /** Join error. */
+ private IgniteSpiException joinErr;
+
+ /** Whether reconnect failed. */
+ private boolean reconFailed;
+
+ /** Joined latch. */
+ private CountDownLatch joinLatch;
+
+ /** Left latch. */
+ private volatile CountDownLatch leaveLatch;
+
+ /** Disconnect check interval. */
+ private long disconnectCheckInt = DFLT_DISCONNECT_CHECK_INT;
+
+ /** {@inheritDoc} */
+ @Override public long getDisconnectCheckInterval() {
+ return disconnectCheckInt;
+ }
+
+ /**
+ * Sets disconnect check interval.
+ *
+ * @param disconnectCheckInt Disconnect check interval.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setDisconnectCheckInterval(long disconnectCheckInt) {
+ this.disconnectCheckInt = disconnectCheckInt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSocketTimeout() {
+ return sockTimeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getAckTimeout() {
+ return ackTimeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNetworkTimeout() {
+ return netTimeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getThreadPriority() {
+ return threadPri;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getHeartbeatFrequency() {
+ return hbFreq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getIpFinderFormatted() {
+ return ipFinder.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMessageWorkerQueueSize() {
+ SocketReader sockRdr0 = sockRdr;
+
+ return sockRdr0 != null ? sockRdr0.msgWrk.queueSize() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNodesJoined() {
+ return stats.joinedNodesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNodesLeft() {
+ return stats.leftNodesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getNodesFailed() {
+ return stats.failedNodesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getAvgMessageProcessingTime() {
+ return stats.avgMessageProcessingTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMaxMessageProcessingTime() {
+ return stats.maxMessageProcessingTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalReceivedMessages() {
+ return stats.totalReceivedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Integer> getReceivedMessages() {
+ return stats.receivedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalProcessedMessages() {
+ return stats.totalProcessedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Integer> getProcessedMessages() {
+ return stats.processedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+ startStopwatch();
+
+ assertParameter(ipFinder != null, "ipFinder != null");
+ assertParameter(netTimeout > 0, "networkTimeout > 0");
+ assertParameter(sockTimeout > 0, "sockTimeout > 0");
+ assertParameter(ackTimeout > 0, "ackTimeout > 0");
+ assertParameter(hbFreq > 0, "heartbeatFreq > 0");
+ assertParameter(threadPri > 0, "threadPri > 0");
+
+ try {
+ locHost = U.resolveLocalHost(locAddr);
+ }
+ catch (IOException e) {
+ throw new IgniteSpiException("Unknown local address: " + locAddr, e);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(configInfo("localHost", locHost.getHostAddress()));
+ log.debug(configInfo("threadPri", threadPri));
+ log.debug(configInfo("networkTimeout", netTimeout));
+ log.debug(configInfo("sockTimeout", sockTimeout));
+ log.debug(configInfo("ackTimeout", ackTimeout));
+ log.debug(configInfo("ipFinder", ipFinder));
+ log.debug(configInfo("heartbeatFreq", hbFreq));
+ }
+
+ // Warn on odd network timeout.
+ if (netTimeout < 3000)
+ U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
+
+ // Warn on odd heartbeat frequency.
+ if (hbFreq < 2000)
+ U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq);
+
+ registerMBean(gridName, this, TcpClientDiscoverySpiMBean.class);
+
+ try {
+ locHost = U.resolveLocalHost(locAddr);
+ }
+ catch (IOException e) {
+ throw new IgniteSpiException("Unknown local address: " + locAddr, e);
+ }
+
+ if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
+ TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder);
+
+ if (mcastIpFinder.getLocalAddress() == null)
+ mcastIpFinder.setLocalAddress(locAddr);
+ }
+
+ IgniteBiTuple<Collection<String>, Collection<String>> addrs;
+
+ try {
+ addrs = U.resolveLocalAddresses(locHost);
+ }
+ catch (IOException | GridException e) {
+ throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e);
+ }
+
+ locNode = new TcpDiscoveryNode(
+ locNodeId,
+ addrs.get1(),
+ addrs.get2(),
+ 0,
+ metricsProvider,
+ locNodeVer);
+
+ locNode.setAttributes(locNodeAttrs);
+ locNode.local(true);
+
+ sockTimeoutWorker = new SocketTimeoutWorker();
+ sockTimeoutWorker.start();
+
+ joinTopology(false);
+
+ disconnectHnd = new DisconnectHandler();
+ disconnectHnd.start();
+
+ if (log.isDebugEnabled())
+ log.debug(startInfo());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStop() throws IgniteSpiException {
+ rmtNodes.clear();
+
+ U.interrupt(disconnectHnd);
+ U.join(disconnectHnd, log);
+
+ U.interrupt(hbSender);
+ U.join(hbSender, log);
+
+ Socket sock0 = sock;
+
+ sock = null;
+
+ if (sock0 != null) {
+ leaveLatch = new CountDownLatch(1);
+
+ try {
+ TcpDiscoveryNodeLeftMessage msg = new TcpDiscoveryNodeLeftMessage(locNodeId);
+
+ msg.client(true);
+
+ writeToSocket(sock0, msg);
+
+ if (!U.await(leaveLatch, netTimeout, MILLISECONDS)) {
+ if (log.isDebugEnabled())
+ log.debug("Did not receive node left message for local node (will stop anyway) [sock=" +
+ sock0 + ']');
+ }
+ }
+ catch (IOException | GridException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to send node left message (will stop anyway) [sock=" + sock0 + ']', e);
+ }
+ finally {
+ U.closeQuiet(sock0);
+ }
+ }
+
+ U.interrupt(sockRdr);
+ U.join(sockRdr, log);
+
+ U.interrupt(sockTimeoutWorker);
+ U.join(sockTimeoutWorker, log);
+
+ unregisterMBean();
+
+ if (log.isDebugEnabled())
+ log.debug(stopInfo());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Object> injectables() {
+ return Arrays.<Object>asList(ipFinder);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> getRemoteNodes() {
+ return F.view(U.<TcpDiscoveryNode, ClusterNode>arrayList(rmtNodes.values(), new P1<TcpDiscoveryNode>() {
+ @Override public boolean apply(TcpDiscoveryNode node) {
+ return node.visible();
+ }
+ }));
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+ if (locNodeId.equals(nodeId))
+ return locNode;
+
+ TcpDiscoveryNode node = rmtNodes.get(nodeId);
+
+ return node != null && node.visible() ? node : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean pingNode(UUID nodeId) {
+ assert nodeId != null;
+
+ if (nodeId.equals(locNodeId))
+ return true;
+
+ TcpDiscoveryNode node = rmtNodes.get(nodeId);
+
+ return node != null && node.visible();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void disconnect() throws IgniteSpiException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
+ // No-op.
+ }
+
+ /**
+ * @param recon Reconnect flag.
+ * @return Whether joined successfully.
+ * @throws org.apache.ignite.spi.IgniteSpiException In case of error.
+ */
+ private boolean joinTopology(boolean recon) throws IgniteSpiException {
+ if (!recon)
+ stats.onJoinStarted();
+
+ Collection<InetSocketAddress> addrs = null;
+
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ while (addrs == null || addrs.isEmpty()) {
+ addrs = resolvedAddresses();
+
+ if (!F.isEmpty(addrs)) {
+ if (log.isDebugEnabled())
+ log.debug("Resolved addresses from IP finder: " + addrs);
+ }
+ else {
+ U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + ipFinder);
+
+ U.sleep(2000);
+ }
+ }
+
+ Iterator<InetSocketAddress> it = addrs.iterator();
+
+ while (it.hasNext() && !Thread.currentThread().isInterrupted()) {
+ InetSocketAddress addr = it.next();
+
+ Socket sock = null;
+
+ try {
+ long ts = U.currentTimeMillis();
+
+ IgniteBiTuple<Socket, UUID> t = initConnection(addr);
+
+ sock = t.get1();
+
+ UUID rmtNodeId = t.get2();
+
+ stats.onClientSocketInitialized(U.currentTimeMillis() - ts);
+
+ locNode.clientRouterNodeId(rmtNodeId);
+
+ TcpDiscoveryAbstractMessage msg = recon ?
+ new TcpDiscoveryClientReconnectMessage(locNodeId, rmtNodeId, lastMsgId) :
+ new TcpDiscoveryJoinRequestMessage(locNode, null);
+
+ msg.client(true);
+
+ writeToSocket(sock, msg);
+
+ int res = readReceipt(sock, ackTimeout);
+
+ switch (res) {
+ case RES_OK:
+ this.sock = sock;
+
+ sockRdr = new SocketReader(rmtNodeId, new MessageWorker(recon));
+ sockRdr.start();
+
+ if (U.await(joinLatch, netTimeout, MILLISECONDS)) {
+ IgniteSpiException joinErr0 = joinErr;
+
+ if (joinErr0 != null)
+ throw joinErr0;
+
+ if (reconFailed) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to reconnect, will try to rejoin [locNode=" +
+ locNode + ']');
+
+ U.closeQuiet(sock);
+
+ U.interrupt(sockRdr);
+ U.join(sockRdr, log);
+
+ this.sock = null;
+
+ return false;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Successfully connected to topology [sock=" + sock + ']');
+
+ hbSender = new HeartbeatSender();
+ hbSender.start();
+
+ stats.onJoinFinished();
+
+ return true;
+ }
+ else {
+ U.warn(log, "Join process timed out (will try other address) [sock=" + sock +
+ ", timeout=" + netTimeout + ']');
+
+ U.closeQuiet(sock);
+
+ U.interrupt(sockRdr);
+ U.join(sockRdr, log);
+
+ it.remove();
+
+ break;
+ }
+
+ case RES_CONTINUE_JOIN:
+ case RES_WAIT:
+ U.closeQuiet(sock);
+
+ break;
+
+ default:
+ if (log.isDebugEnabled())
+ log.debug("Received unexpected response to join request: " + res);
+
+ U.closeQuiet(sock);
+ }
+ }
+ catch (GridInterruptedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Joining thread was interrupted.");
+
+ return false;
+ }
+ catch (IOException | GridException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to establish connection with address: " + addr, e);
+
+ U.closeQuiet(sock);
+
+ it.remove();
+ }
+ }
+
+ if (addrs.isEmpty()) {
+ U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
+ "in 2000ms): " + addrs);
+
+ U.sleep(2000);
+ }
+ }
+ catch (GridInterruptedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Joining thread was interrupted.");
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param addr Address.
+ * @return Remote node ID.
+ * @throws IOException In case of I/O error.
+ * @throws GridException In case of other error.
+ */
+ private IgniteBiTuple<Socket, UUID> initConnection(InetSocketAddress addr) throws IOException, GridException {
+ assert addr != null;
+
+ joinLatch = new CountDownLatch(1);
+
+ Socket sock = openSocket(addr);
+
+ TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
+
+ req.client(true);
+
+ writeToSocket(sock, req);
+
+ TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout);
+
+ UUID nodeId = res.creatorNodeId();
+
+ assert nodeId != null;
+ assert !locNodeId.equals(nodeId);
+
+ return F.t(sock, nodeId);
+ }
+
+ /**
+ * FOR TEST PURPOSE ONLY!
+ */
+ void simulateNodeFailure() {
+ U.warn(log, "Simulating client node failure: " + locNodeId);
+
+ U.closeQuiet(sock);
+
+ U.interrupt(disconnectHnd);
+ U.join(disconnectHnd, log);
+
+ U.interrupt(hbSender);
+ U.join(hbSender, log);
+
+ U.interrupt(sockRdr);
+ U.join(sockRdr, log);
+
+ U.interrupt(sockTimeoutWorker);
+ U.join(sockTimeoutWorker, log);
+ }
+
+ /**
+ * Disconnect handler.
+ */
+ private class DisconnectHandler extends IgniteSpiThread {
+ /**
+ */
+ protected DisconnectHandler() {
+ super(gridName, "tcp-client-disco-disconnect-hnd", log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ while (!isInterrupted()) {
+ try {
+ U.sleep(disconnectCheckInt);
+
+ if (sock == null) {
+ if (log.isDebugEnabled())
+ log.debug("Node is disconnected from topology, will try to reconnect.");
+
+ U.interrupt(hbSender);
+ U.join(hbSender, log);
+
+ U.interrupt(sockRdr);
+ U.join(sockRdr, log);
+
+ // If reconnection fails, try to rejoin.
+ if (!joinTopology(true)) {
+ rmtNodes.clear();
+
+ locNode.order(0);
+
+ joinTopology(false);
+
+ getSpiContext().recordEvent(new IgniteDiscoveryEvent(locNode,
+ "Client node reconnected: " + locNode,
+ EVT_CLIENT_NODE_RECONNECTED, locNode));
+ }
+ }
+ }
+ catch (GridInterruptedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Disconnect handler was interrupted.");
+
+ return;
+ }
+ catch (IgniteSpiException e) {
+ U.error(log, "Failed to reconnect to topology after failure.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Heartbeat sender.
+ */
+ private class HeartbeatSender extends IgniteSpiThread {
+ /**
+ */
+ protected HeartbeatSender() {
+ super(gridName, "tcp-client-disco-heartbeat-sender", log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ Socket sock0 = sock;
+
+ if (sock0 == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to start heartbeat sender, node is already disconnected.");
+
+ return;
+ }
+
+ try {
+ while (!isInterrupted()) {
+ U.sleep(hbFreq);
+
+ TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(locNodeId);
+
+ msg.client(true);
+
+ sockRdr.addMessage(msg);
+ }
+ }
+ catch (GridInterruptedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Heartbeat sender was interrupted.");
+ }
+ }
+ }
+
+ /**
+ * Socket reader.
+ */
+ private class SocketReader extends IgniteSpiThread {
+ /** Remote node ID. */
+ private final UUID nodeId;
+
+ /** Message worker. */
+ private final MessageWorker msgWrk;
+
+ /**
+ * @param nodeId Node ID.
+ * @param msgWrk Message worker.
+ */
+ protected SocketReader(UUID nodeId, MessageWorker msgWrk) {
+ super(gridName, "tcp-client-disco-sock-reader", log);
+
+ assert nodeId != null;
+ assert msgWrk != null;
+
+ this.nodeId = nodeId;
+ this.msgWrk = msgWrk;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void start() {
+ super.start();
+
+ msgWrk.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ Socket sock0 = sock;
+
+ if (sock0 == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to start socket reader, node is already disconnected.");
+
+ return;
+ }
+
+ try {
+ InputStream in = new BufferedInputStream(sock0.getInputStream());
+
+ sock0.setKeepAlive(true);
+ sock0.setTcpNoDelay(true);
+
+ while (!isInterrupted()) {
+ try {
+ TcpDiscoveryAbstractMessage msg = marsh.unmarshal(in, U.gridClassLoader());
+
+ msg.senderNodeId(nodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Message has been received: " + msg);
+
+ stats.onMessageReceived(msg);
+
+ IgniteSpiException err = null;
+
+ if (joinLatch.getCount() > 0) {
+ if (msg instanceof TcpDiscoveryDuplicateIdMessage)
+ err = duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg);
+ else if (msg instanceof TcpDiscoveryAuthFailedMessage)
+ err = authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg);
+ else if (msg instanceof TcpDiscoveryCheckFailedMessage)
+ err = checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
+
+ if (err != null) {
+ joinErr = err;
+
+ joinLatch.countDown();
+
+ return;
+ }
+ }
+
+ msgWrk.addMessage(msg);
+ }
+ catch (GridException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to read message [sock=" + sock0 + ", locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId + ']', e);
+
+ IOException ioEx = X.cause(e, IOException.class);
+
+ if (ioEx != null)
+ throw ioEx;
+
+ ClassNotFoundException clsNotFoundEx = X.cause(e, ClassNotFoundException.class);
+
+ if (clsNotFoundEx != null)
+ LT.warn(log, null, "Failed to read message due to ClassNotFoundException " +
+ "(make sure same versions of all classes are available on all nodes) " +
+ "[rmtNodeId=" + nodeId + ", err=" + clsNotFoundEx.getMessage() + ']');
+ else
+ LT.error(log, e, "Failed to read message [sock=" + sock0 + ", locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId + ']');
+ }
+ }
+ }
+ catch (IOException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Connection failed [sock=" + sock0 + ", locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId + ']', e);
+ }
+ finally {
+ U.closeQuiet(sock0);
+
+ U.interrupt(msgWrk);
+
+ try {
+ U.join(msgWrk);
+ }
+ catch (GridInterruptedException ignored) {
+ // No-op.
+ }
+
+ sock = null;
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ void addMessage(TcpDiscoveryAbstractMessage msg) {
+ assert msg != null;
+
+ msgWrk.addMessage(msg);
+ }
+ }
+
+ /**
+ * Message worker.
+ */
+ private class MessageWorker extends MessageWorkerAdapter {
+ /** Topology history. */
+ private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+
+ /** Indicates that reconnection is in progress. */
+ private boolean recon;
+
+ /** Indicates that pending messages are currently processed. */
+ private boolean pending;
+
+ /**
+ * @param recon Whether reconnection is in progress.
+ */
+ protected MessageWorker(boolean recon) {
+ super("tcp-client-disco-msg-worker");
+
+ this.recon = recon;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+ assert msg != null;
+ assert msg.verified() || msg.senderNodeId() == null;
+
+ stats.onMessageProcessingStarted(msg);
+
+ if (msg instanceof TcpDiscoveryClientReconnectMessage)
+ processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+ else {
+ if (recon && !pending) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding message received during reconnection: " + msg);
+ }
+ else {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage)
+ processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+ processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
+ else if (msg instanceof TcpDiscoveryNodeLeftMessage)
+ processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
+ else if (msg instanceof TcpDiscoveryNodeFailedMessage)
+ processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
+ else if (msg instanceof TcpDiscoveryHeartbeatMessage)
+ processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+
+ if (ensured(msg))
+ lastMsgId = msg.id();
+ }
+ }
+
+ stats.onMessageProcessingFinished(msg);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
+ if (leaveLatch != null)
+ return;
+
+ TcpDiscoveryNode node = msg.node();
+
+ UUID newNodeId = node.id();
+
+ if (locNodeId.equals(newNodeId)) {
+ if (joinLatch.getCount() > 0) {
+ Collection<TcpDiscoveryNode> top = msg.topology();
+
+ if (top != null) {
+ for (TcpDiscoveryNode n : top) {
+ if (n.order() > 0)
+ n.visible(true);
+
+ rmtNodes.put(n.id(), n);
+ }
+
+ topHist.clear();
+
+ if (msg.topologyHistory() != null)
+ topHist.putAll(msg.topologyHistory());
+
+ Collection<List<Object>> dataList = msg.oldNodesDiscoveryData();
+
+ if (dataList != null) {
+ for (List<Object> discoData : dataList)
+ exchange.onExchange(discoData);
+ }
+
+ locNode.setAttributes(node.attributes());
+ locNode.visible(true);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding node added message with empty topology: " + msg);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding node added message (this message has already been processed) " +
+ "[msg=" + msg + ", locNode=" + locNode + ']');
+ }
+ else {
+ boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
+
+ if (topChanged) {
+ if (log.isDebugEnabled())
+ log.debug("Added new node to topology: " + node);
+
+ List<Object> data = msg.newNodeDiscoveryData();
+
+ if (data != null)
+ exchange.onExchange(data);
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
+ if (leaveLatch != null)
+ return;
+
+ if (locNodeId.equals(msg.nodeId())) {
+ if (joinLatch.getCount() > 0) {
+ long topVer = msg.topologyVersion();
+
+ locNode.order(topVer);
+
+ notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer));
+
+ joinErr = null;
+
+ joinLatch.countDown();
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message (this message has already been processed) " +
+ "[msg=" + msg + ", locNode=" + locNode + ']');
+ }
+ else {
+ TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']');
+
+ return;
+ }
+
+ long topVer = msg.topologyVersion();
+
+ node.order(topVer);
+ node.visible(true);
+
+ if (locNodeVer.equals(node.version()))
+ node.version(locNodeVer);
+
+ Collection<ClusterNode> top = updateTopologyHistory(topVer);
+
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node add finished message (join process is not finished): " + msg);
+
+ return;
+ }
+
+ notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
+
+ stats.onNodeJoined();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
+ if (locNodeId.equals(msg.creatorNodeId())) {
+ if (log.isDebugEnabled())
+ log.debug("Received node left message for local node: " + msg);
+
+ CountDownLatch leaveLatch0 = leaveLatch;
+
+ assert leaveLatch0 != null;
+
+ leaveLatch0.countDown();
+ }
+ else {
+ if (leaveLatch != null)
+ return;
+
+ TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node left message since node is not found [msg=" + msg + ']');
+
+ return;
+ }
+
+ Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node left message (join process is not finished): " + msg);
+
+ return;
+ }
+
+ notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
+
+ stats.onNodeLeft();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
+ if (leaveLatch != null)
+ return;
+
+ if (!locNodeId.equals(msg.creatorNodeId())) {
+ TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
+
+ if (node == null) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node failed message since node is not found [msg=" + msg + ']');
+
+ return;
+ }
+
+ Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+
+ if (!pending && joinLatch.getCount() > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Discarding node failed message (join process is not finished): " + msg);
+
+ return;
+ }
+
+ notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
+
+ stats.onNodeFailed();
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
+ if (leaveLatch != null)
+ return;
+
+ if (locNodeId.equals(msg.creatorNodeId())) {
+ if (msg.senderNodeId() == null) {
+ Socket sock0 = sock;
+
+ if (sock0 != null) {
+ msg.setMetrics(locNodeId, metricsProvider.getMetrics());
+
+ try {
+ writeToSocket(sock0, msg);
+
+ if (log.isDebugEnabled())
+ log.debug("Heartbeat message sent [sock=" + sock0 + ", msg=" + msg + ']');
+ }
+ catch (IOException | GridException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to send heartbeat message [sock=" + sock0 +
+ ", msg=" + msg + ']', e);
+
+ U.closeQuiet(sock0);
+
+ sock = null;
+
+ interrupt();
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Failed to send heartbeat message (node is disconnected): " + msg);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Received heartbeat response: " + msg);
+ }
+ else {
+ if (msg.hasMetrics()) {
+ long tstamp = U.currentTimeMillis();
+
+ for (Map.Entry<UUID, MetricsSet> e : msg.metrics().entrySet()) {
+ MetricsSet metricsSet = e.getValue();
+
+ updateMetrics(e.getKey(), metricsSet.metrics(), tstamp);
+
+ for (T2<UUID, ClusterNodeMetrics> t : metricsSet.clientMetrics())
+ updateMetrics(t.get1(), t.get2(), tstamp);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) {
+ if (leaveLatch != null)
+ return;
+
+ if (locNodeId.equals(msg.creatorNodeId())) {
+ if (msg.success()) {
+ pending = true;
+
+ try {
+ for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
+ processMessage(pendingMsg);
+ }
+ finally {
+ pending = false;
+ }
+
+ joinErr = null;
+ reconFailed = false;
+
+ joinLatch.countDown();
+ }
+ else {
+ joinErr = null;
+ reconFailed = true;
+
+ getSpiContext().recordEvent(new IgniteDiscoveryEvent(locNode,
+ "Client node disconnected: " + locNode,
+ EVT_CLIENT_NODE_DISCONNECTED, locNode));
+
+ joinLatch.countDown();
+ }
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Discarding reconnect message for another client: " + msg);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param metrics Metrics.
+ * @param tstamp Timestamp.
+ */
+ private void updateMetrics(UUID nodeId, ClusterNodeMetrics metrics, long tstamp) {
+ assert nodeId != null;
+ assert metrics != null;
+
+ TcpDiscoveryNode node = nodeId.equals(locNodeId) ? locNode : rmtNodes.get(nodeId);
+
+ if (node != null && node.visible()) {
+ node.setMetrics(metrics);
+
+ node.lastUpdateTime(tstamp);
+
+ notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes());
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Received metrics from unknown node: " + nodeId);
+ }
+
+ /**
+ * @param topVer New topology version.
+ * @return Latest topology snapshot.
+ */
+ private Collection<ClusterNode> updateTopologyHistory(long topVer) {
+ TcpClientDiscoverySpi.this.topVer = topVer;
+
+ Collection<ClusterNode> allNodes = allNodes();
+
+ if (!topHist.containsKey(topVer)) {
+ assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
+ "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
+
+ topHist.put(topVer, allNodes);
+
+ if (topHist.size() > topHistSize)
+ topHist.pollFirstEntry();
+
+ assert topHist.lastKey() == topVer;
+ assert topHist.size() <= topHistSize;
+ }
+
+ return allNodes;
+ }
+
+ /**
+ * @return All nodes.
+ */
+ private Collection<ClusterNode> allNodes() {
+ Collection<ClusterNode> allNodes = new TreeSet<>();
+
+ for (TcpDiscoveryNode node : rmtNodes.values()) {
+ if (node.visible())
+ allNodes.add(node);
+ }
+
+ allNodes.add(locNode);
+
+ return allNodes;
+ }
+
+ /**
+ * @param type Event type.
+ * @param topVer Topology version.
+ * @param node Node.
+ * @param top Topology snapshot.
+ */
+ private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) {
+ DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr;
+
+ if (lsnr != null) {
+ if (log.isDebugEnabled())
+ log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
+ ", topVer=" + topVer + ']');
+
+ lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist));
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) +
+ ", topVer=" + topVer + ']');
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
new file mode 100644
index 0000000..95c7fb9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMBean.java
@@ -0,0 +1,156 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.mbean.*;
+import org.apache.ignite.spi.*;
+
+import java.util.*;
+
+/**
+ * Management bean for {@link TcpClientDiscoverySpi}.
+ */
+public interface TcpClientDiscoverySpiMBean extends IgniteSpiManagementMBean {
+ /**
+ * Gets disconnect check interval.
+ *
+ * @return Disconnect check interval.
+ */
+ @IgniteMBeanDescription("Disconnect check interval.")
+ public long getDisconnectCheckInterval();
+
+ /**
+ * Gets socket timeout.
+ *
+ * @return Socket timeout.
+ */
+ @IgniteMBeanDescription("Socket timeout.")
+ public long getSocketTimeout();
+
+ /**
+ * Gets message acknowledgement timeout.
+ *
+ * @return Message acknowledgement timeout.
+ */
+ @IgniteMBeanDescription("Message acknowledgement timeout.")
+ public long getAckTimeout();
+
+ /**
+ * Gets network timeout.
+ *
+ * @return Network timeout.
+ */
+ @IgniteMBeanDescription("Network timeout.")
+ public long getNetworkTimeout();
+
+ /**
+ * Gets thread priority. All threads within SPI will be started with it.
+ *
+ * @return Thread priority.
+ */
+ @IgniteMBeanDescription("Threads priority.")
+ public int getThreadPriority();
+
+ /**
+ * Gets delay between heartbeat messages sent by coordinator.
+ *
+ * @return Time period in milliseconds.
+ */
+ @IgniteMBeanDescription("Heartbeat frequency.")
+ public long getHeartbeatFrequency();
+
+ /**
+ * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation).
+ *
+ * @return IPFinder (string representation).
+ */
+ @IgniteMBeanDescription("IP Finder.")
+ public String getIpFinderFormatted();
+
+ /**
+ * Gets message worker queue current size.
+ *
+ * @return Message worker queue current size.
+ */
+ @IgniteMBeanDescription("Message worker queue current size.")
+ public int getMessageWorkerQueueSize();
+
+ /**
+ * Gets joined nodes count.
+ *
+ * @return Nodes joined count.
+ */
+ @IgniteMBeanDescription("Nodes joined count.")
+ public long getNodesJoined();
+
+ /**
+ * Gets left nodes count.
+ *
+ * @return Left nodes count.
+ */
+ @IgniteMBeanDescription("Nodes left count.")
+ public long getNodesLeft();
+
+ /**
+ * Gets failed nodes count.
+ *
+ * @return Failed nodes count.
+ */
+ @IgniteMBeanDescription("Nodes failed count.")
+ public long getNodesFailed();
+
+ /**
+ * Gets avg message processing time.
+ *
+ * @return Avg message processing time.
+ */
+ @IgniteMBeanDescription("Avg message processing time.")
+ public long getAvgMessageProcessingTime();
+
+ /**
+ * Gets max message processing time.
+ *
+ * @return Max message processing time.
+ */
+ @IgniteMBeanDescription("Max message processing time.")
+ public long getMaxMessageProcessingTime();
+
+ /**
+ * Gets total received messages count.
+ *
+ * @return Total received messages count.
+ */
+ @IgniteMBeanDescription("Total received messages count.")
+ public int getTotalReceivedMessages();
+
+ /**
+ * Gets received messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ @IgniteMBeanDescription("Received messages by type.")
+ public Map<String, Integer> getReceivedMessages();
+
+ /**
+ * Gets total processed messages count.
+ *
+ * @return Total processed messages count.
+ */
+ @IgniteMBeanDescription("Total processed messages count.")
+ public int getTotalProcessedMessages();
+
+ /**
+ * Gets processed messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ @IgniteMBeanDescription("Received messages by type.")
+ public Map<String, Integer> getProcessedMessages();
+}