You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 15:07:37 UTC
[36/52] [abbrv] incubator-ignite git commit: # Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
new file mode 100644
index 0000000..5377e18
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -0,0 +1,996 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.product.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.io.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
+
+/**
+ * Base class for TCP discovery SPIs.
+ */
+abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi {
+ /** Default port to listen (value is <tt>47500</tt>). */
+ public static final int DFLT_PORT = 47500;
+
+ /** Default socket operations timeout in milliseconds (value is <tt>2,000ms</tt>). */
+ public static final long DFLT_SOCK_TIMEOUT = 2000;
+
+ /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */
+ public static final long DFLT_ACK_TIMEOUT = 5000;
+
+ /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */
+ public static final long DFLT_NETWORK_TIMEOUT = 5000;
+
+ /** Default value for thread priority (value is <tt>10</tt>). */
+ public static final int DFLT_THREAD_PRI = 10;
+
+ /** Default heartbeat messages issuing frequency (value is <tt>2,000ms</tt>). */
+ public static final long DFLT_HEARTBEAT_FREQ = 2000;
+
+ /** Default size of topology snapshots history. */
+ public static final int DFLT_TOP_HISTORY_SIZE = 1000;
+
+ /** Response OK. */
+ protected static final int RES_OK = 1;
+
+ /** Response CONTINUE JOIN. */
+ protected static final int RES_CONTINUE_JOIN = 100;
+
+ /** Response WAIT. */
+ protected static final int RES_WAIT = 200;
+
+ /** Local address. */
+ protected String locAddr;
+
+ /** IP finder. */
+ protected TcpDiscoveryIpFinder ipFinder;
+
+ /** Socket operations timeout. */
+ protected long sockTimeout = DFLT_SOCK_TIMEOUT;
+
+ /** Message acknowledgement timeout. */
+ protected long ackTimeout = DFLT_ACK_TIMEOUT;
+
+ /** Network timeout. */
+ protected long netTimeout = DFLT_NETWORK_TIMEOUT;
+
+ /** Thread priority for all threads started by SPI. */
+ protected int threadPri = DFLT_THREAD_PRI;
+
+ /** Heartbeat messages issuing frequency. */
+ protected long hbFreq = DFLT_HEARTBEAT_FREQ;
+
+ /** Size of topology snapshots history. */
+ protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
+
+ /** Grid discovery listener. */
+ protected volatile DiscoverySpiListener lsnr;
+
+ /** Data exchange. */
+ protected DiscoverySpiDataExchange exchange;
+
+ /** Metrics provider. */
+ protected DiscoveryMetricsProvider metricsProvider;
+
+ /** Local node attributes. */
+ protected Map<String, Object> locNodeAttrs;
+
+ /** Local node version. */
+ protected IgniteProductVersion locNodeVer;
+
+ /** Local node. */
+ protected TcpDiscoveryNode locNode;
+
+ /** Local host. */
+ protected InetAddress locHost;
+
+ /** Internal and external addresses of local node. */
+ protected Collection<InetSocketAddress> locNodeAddrs;
+
+ /** Socket timeout worker. */
+ protected SocketTimeoutWorker sockTimeoutWorker;
+
+ /** Discovery state. */
+ protected TcpDiscoverySpiState spiState = DISCONNECTED;
+
+ /** Start time of the very first grid node. */
+ protected volatile long gridStartTime;
+
+ /** Marshaller. */
+ protected final IgniteMarshaller marsh = new IgniteJdkMarshaller();
+
+ /** Statistics. */
+ protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics();
+
+ /** Local node ID. */
+ @IgniteLocalNodeIdResource
+ protected UUID locNodeId;
+
+ /** Name of the grid. */
+ @IgniteNameResource
+ protected String gridName;
+
+ /** Logger. */
+ @IgniteLoggerResource
+ protected IgniteLogger log;
+
+ /**
+ * Sets local host IP address that discovery SPI uses.
+ * <p>
+ * If not provided, by default a first found non-loopback address
+ * will be used. If there is no non-loopback address available,
+ * then {@link InetAddress#getLocalHost()} will be used.
+ *
+ * @param locAddr IP address.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ @IgniteLocalHostResource
+ public void setLocalAddress(String locAddr) {
+ // Injection should not override value already set by Spring or user.
+ if (this.locAddr == null)
+ this.locAddr = locAddr;
+ }
+
+ /**
+ * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method.
+ *
+ * @return local address.
+ */
+ public String getLocalAddress() {
+ return locAddr;
+ }
+
+ /**
+ * Gets IP finder for IP addresses sharing and storing.
+ *
+ * @return IP finder for IP addresses sharing and storing.
+ */
+ public TcpDiscoveryIpFinder getIpFinder() {
+ return ipFinder;
+ }
+
+ /**
+ * Sets IP finder for IP addresses sharing and storing.
+ * <p>
+ * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
+ *
+ * @param ipFinder IP finder.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setIpFinder(TcpDiscoveryIpFinder ipFinder) {
+ this.ipFinder = ipFinder;
+ }
+
+ /**
+ * Sets socket operations timeout. This timeout is used to limit connection time and
+ * write-to-socket time.
+ * <p>
+ * Note that when running GridGain on Amazon EC2, socket timeout must be set to a value
+ * significantly greater than the default (e.g. to {@code 30000}).
+ * <p>
+ * If not specified, default is {@link #DFLT_SOCK_TIMEOUT}.
+ *
+ * @param sockTimeout Socket connection timeout.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setSocketTimeout(long sockTimeout) {
+ this.sockTimeout = sockTimeout;
+ }
+
+ /**
+ * Sets timeout for receiving acknowledgement for sent message.
+ * <p>
+ * If acknowledgement is not received within this timeout, sending is considered as failed
+ * and SPI tries to repeat message sending.
+ * <p>
+ * If not specified, default is {@link #DFLT_ACK_TIMEOUT}.
+ *
+ * @param ackTimeout Acknowledgement timeout.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setAckTimeout(long ackTimeout) {
+ this.ackTimeout = ackTimeout;
+ }
+
+ /**
+ * Sets maximum network timeout to use for network operations.
+ * <p>
+ * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}.
+ *
+ * @param netTimeout Network timeout.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setNetworkTimeout(long netTimeout) {
+ this.netTimeout = netTimeout;
+ }
+
+ /**
+ * Sets thread priority. All threads within SPI will be started with it.
+ * <p>
+ * If not provided, default value is {@link #DFLT_THREAD_PRI}
+ *
+ * @param threadPri Thread priority.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setThreadPriority(int threadPri) {
+ this.threadPri = threadPri;
+ }
+
+ /**
+ * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
+ * in configurable time interval to other nodes to notify them about its state.
+ * <p>
+ * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
+ *
+ * @param hbFreq Heartbeat frequency in milliseconds.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setHeartbeatFrequency(long hbFreq) {
+ this.hbFreq = hbFreq;
+ }
+
+ /**
+ * @return Size of topology snapshots history.
+ */
+ public long getTopHistorySize() {
+ return topHistSize;
+ }
+
+ /**
+ * Sets size of topology snapshots history. Specified size should be greater than or equal to default size
+ * {@link #DFLT_TOP_HISTORY_SIZE}.
+ *
+ * @param topHistSize Size of topology snapshots history.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setTopHistorySize(int topHistSize) {
+ if (topHistSize < DFLT_TOP_HISTORY_SIZE) {
+ U.warn(log, "Topology history size should be greater than or equal to default size. " +
+ "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize +
+ ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']');
+
+ return;
+ }
+
+ this.topHistSize = topHistSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
+ assert locNodeAttrs == null;
+ assert locNodeVer == null;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Node attributes to set: " + attrs);
+ log.debug("Node version to set: " + ver);
+ }
+
+ locNodeAttrs = attrs;
+ locNodeVer = ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+ super.onContextInitialized0(spiCtx);
+
+ ipFinder.onSpiContextInitialized(spiCtx);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onContextDestroyed0() {
+ super.onContextDestroyed0();
+
+ ipFinder.onSpiContextDestroyed();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode getLocalNode() {
+ return locNode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+ this.lsnr = lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
+ this.exchange = exchange;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
+ this.metricsProvider = metricsProvider;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getGridStartTime() {
+ assert gridStartTime != 0;
+
+ return gridStartTime;
+ }
+
+ /**
+ * @param sockAddr Remote address.
+ * @return Opened socket.
+ * @throws IOException If failed.
+ */
+ protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+ assert sockAddr != null;
+
+ InetSocketAddress resolved = sockAddr.isUnresolved() ?
+ new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr;
+
+ InetAddress addr = resolved.getAddress();
+
+ assert addr != null;
+
+ Socket sock = new Socket();
+
+ sock.bind(new InetSocketAddress(locHost, 0));
+
+ sock.setTcpNoDelay(true);
+
+ sock.connect(resolved, (int)sockTimeout);
+
+ writeToSocket(sock, U.GG_HEADER);
+
+ return sock;
+ }
+
+ /**
+ * Writes message to the socket.
+ *
+ * @param sock Socket.
+ * @param data Raw data to write.
+ * @throws IOException If IO failed or write timed out.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ protected void writeToSocket(Socket sock, byte[] data) throws IOException {
+ assert sock != null;
+ assert data != null;
+
+ SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+
+ sockTimeoutWorker.addTimeoutObject(obj);
+
+ IOException err = null;
+
+ try {
+ OutputStream out = sock.getOutputStream();
+
+ out.write(data);
+
+ out.flush();
+ }
+ catch (IOException e) {
+ err = e;
+ }
+ finally {
+ boolean cancelled = obj.cancel();
+
+ if (cancelled)
+ sockTimeoutWorker.removeTimeoutObject(obj);
+
+ // Throw original exception.
+ if (err != null)
+ throw err;
+
+ if (!cancelled)
+ throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
+ }
+ }
+
+ /**
+ * Writes message to the socket.
+ *
+ * @param sock Socket.
+ * @param msg Message.
+ * @throws IOException If IO failed or write timed out.
+ * @throws GridException If marshalling failed.
+ */
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, GridException {
+ writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K.
+ }
+
+ /**
+ * Writes message to the socket.
+ *
+ * @param sock Socket.
+ * @param msg Message.
+ * @param bout Byte array output stream.
+ * @throws IOException If IO failed or write timed out.
+ * @throws GridException If marshalling failed.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout)
+ throws IOException, GridException {
+ assert sock != null;
+ assert msg != null;
+ assert bout != null;
+
+ // Marshall message first to perform only write after.
+ marsh.marshal(msg, bout);
+
+ SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+
+ sockTimeoutWorker.addTimeoutObject(obj);
+
+ IOException err = null;
+
+ try {
+ OutputStream out = sock.getOutputStream();
+
+ bout.writeTo(out);
+
+ out.flush();
+ }
+ catch (IOException e) {
+ err = e;
+ }
+ finally {
+ boolean cancelled = obj.cancel();
+
+ if (cancelled)
+ sockTimeoutWorker.removeTimeoutObject(obj);
+
+ // Throw original exception.
+ if (err != null)
+ throw err;
+
+ if (!cancelled)
+ throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
+ }
+ }
+
+ /**
+ * Writes response to the socket.
+ *
+ * @param sock Socket.
+ * @param res Integer response.
+ * @throws IOException If IO failed or write timed out.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ protected void writeToSocket(Socket sock, int res) throws IOException {
+ assert sock != null;
+
+ SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+
+ sockTimeoutWorker.addTimeoutObject(obj);
+
+ OutputStream out = sock.getOutputStream();
+
+ IOException err = null;
+
+ try {
+ out.write(res);
+
+ out.flush();
+ }
+ catch (IOException e) {
+ err = e;
+ }
+ finally {
+ boolean cancelled = obj.cancel();
+
+ if (cancelled)
+ sockTimeoutWorker.removeTimeoutObject(obj);
+
+ // Throw original exception.
+ if (err != null)
+ throw err;
+
+ if (!cancelled)
+ throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
+ }
+ }
+
+ /**
+ * Reads message from the socket limiting read time.
+ *
+ * @param sock Socket.
+ * @param in Input stream (in case socket stream was wrapped).
+ * @param timeout Socket timeout for this operation.
+ * @return Message.
+ * @throws IOException If IO failed or read timed out.
+ * @throws GridException If unmarshalling failed.
+ */
+ protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, GridException {
+ assert sock != null;
+
+ int oldTimeout = sock.getSoTimeout();
+
+ try {
+ sock.setSoTimeout((int)timeout);
+
+ return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
+ }
+ catch (IOException | GridException e) {
+ if (X.hasCause(e, SocketTimeoutException.class))
+ LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " +
+ "in long GC pauses on remote node. Current timeout: " + timeout + '.');
+
+ throw e;
+ }
+ finally {
+ // Quietly restore timeout.
+ try {
+ sock.setSoTimeout(oldTimeout);
+ }
+ catch (SocketException ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
+ * Reads message delivery receipt from the socket.
+ *
+ * @param sock Socket.
+ * @param timeout Socket timeout for this operation.
+ * @return Receipt.
+ * @throws IOException If IO failed or read timed out.
+ */
+ protected int readReceipt(Socket sock, long timeout) throws IOException {
+ assert sock != null;
+
+ int oldTimeout = sock.getSoTimeout();
+
+ try {
+ sock.setSoTimeout((int)timeout);
+
+ int res = sock.getInputStream().read();
+
+ if (res == -1)
+ throw new EOFException();
+
+ return res;
+ }
+ catch (SocketTimeoutException e) {
+ LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " +
+ "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " +
+ "configuration property). Will retry to send message with increased timeout. " +
+ "Current timeout: " + timeout + '.');
+
+ stats.onAckTimeout();
+
+ throw e;
+ }
+ finally {
+ // Quietly restore timeout.
+ try {
+ sock.setSoTimeout(oldTimeout);
+ }
+ catch (SocketException ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
+ * Resolves addresses registered in the IP finder, removes duplicates and local host
+ * address and returns the collection of.
+ *
+ * @return Resolved addresses without duplicates and local address (potentially
+ * empty but never null).
+ * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
+ */
+ protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
+ List<InetSocketAddress> res = new ArrayList<>();
+
+ Collection<InetSocketAddress> addrs;
+
+ // Get consistent addresses collection.
+ while (true) {
+ try {
+ addrs = registeredAddresses();
+
+ break;
+ }
+ catch (IgniteSpiException e) {
+ LT.error(log, e, "Failed to get registered addresses from IP finder on start " +
+ "(retrying every 2000 ms).");
+ }
+
+ try {
+ U.sleep(2000);
+ }
+ catch (GridInterruptedException e) {
+ throw new IgniteSpiException("Thread has been interrupted.", e);
+ }
+ }
+
+ for (InetSocketAddress addr : addrs) {
+ assert addr != null;
+
+ try {
+ InetSocketAddress resolved = addr.isUnresolved() ?
+ new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr;
+
+ if (locNodeAddrs == null || !locNodeAddrs.contains(resolved))
+ res.add(resolved);
+ }
+ catch (UnknownHostException ignored) {
+ LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr);
+
+ // Add address in any case.
+ res.add(addr);
+ }
+ }
+
+ if (!res.isEmpty())
+ Collections.shuffle(res);
+
+ return res;
+ }
+
+ /**
+ * Gets addresses registered in the IP finder, initializes addresses having no
+ * port (or 0 port) with {@link #DFLT_PORT}.
+ *
+ * @return Registered addresses.
+ * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
+ */
+ protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException {
+ Collection<InetSocketAddress> res = new LinkedList<>();
+
+ for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) {
+ if (addr.getPort() == 0)
+ addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), DFLT_PORT) :
+ new InetSocketAddress(addr.getAddress(), DFLT_PORT);
+
+ res.add(addr);
+ }
+
+ return res;
+ }
+
+ /**
+ * @param msg Message.
+ * @return Error.
+ */
+ protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) {
+ assert msg != null;
+
+ return new IgniteSpiException("Local node has the same ID as existing node in topology " +
+ "(fix configuration and restart local node) [localNode=" + locNode +
+ ", existingNode=" + msg.node() + ']');
+ }
+
+ /**
+ * @param msg Message.
+ * @return Error.
+ */
+ protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) {
+ assert msg != null;
+
+ return new IgniteSpiException(new GridAuthenticationException("Authentication failed [nodeId=" +
+ msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']'));
+ }
+
+ /**
+ * @param msg Message.
+ * @return Error.
+ */
+ protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) {
+ assert msg != null;
+
+ return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) :
+ new IgniteSpiException(msg.error());
+ }
+
+ /**
+ * @param msg Message.
+ * @return Whether delivery of the message is ensured.
+ */
+ protected boolean ensured(TcpDiscoveryAbstractMessage msg) {
+ return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null;
+ }
+
+ /**
+ * @param msg Failed message.
+ * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise.
+ * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it
+ * and create separate message for failed version check with next major release.
+ */
+ @Deprecated
+ private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) {
+ return msg.error().contains("versions are not compatible");
+ }
+
+ /**
+ * Handles sockets timeouts.
+ */
+ protected class SocketTimeoutWorker extends IgniteSpiThread {
+ /** Time-based sorted set for timeout objects. */
+ private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs =
+ new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() {
+ @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) {
+ long time1 = o1.endTime();
+ long time2 = o2.endTime();
+
+ long id1 = o1.id();
+ long id2 = o2.id();
+
+ return time1 < time2 ? -1 : time1 > time2 ? 1 :
+ id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
+ }
+ });
+
+ /** Mutex. */
+ private final Object mux0 = new Object();
+
+ /**
+ *
+ */
+ SocketTimeoutWorker() {
+ super(gridName, "tcp-disco-sock-timeout-worker", log);
+
+ setPriority(threadPri);
+ }
+
+ /**
+ * @param timeoutObj Timeout object to add.
+ */
+ @SuppressWarnings({"NakedNotify"})
+ public void addTimeoutObject(SocketTimeoutObject timeoutObj) {
+ assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
+
+ timeoutObjs.add(timeoutObj);
+
+ if (timeoutObjs.firstx() == timeoutObj) {
+ synchronized (mux0) {
+ mux0.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * @param timeoutObj Timeout object to remove.
+ */
+ public void removeTimeoutObject(SocketTimeoutObject timeoutObj) {
+ assert timeoutObj != null;
+
+ timeoutObjs.remove(timeoutObj);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ if (log.isDebugEnabled())
+ log.debug("Socket timeout worker has been started.");
+
+ while (!isInterrupted()) {
+ long now = U.currentTimeMillis();
+
+ for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
+ SocketTimeoutObject timeoutObj = iter.next();
+
+ if (timeoutObj.endTime() <= now) {
+ iter.remove();
+
+ if (timeoutObj.onTimeout()) {
+ LT.warn(log, null, "Socket write has timed out (consider increasing " +
+ "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
+
+ stats.onSocketTimeout();
+ }
+ }
+ else
+ break;
+ }
+
+ synchronized (mux0) {
+ while (true) {
+ // Access of the first element must be inside of
+ // synchronization block, so we don't miss out
+ // on thread notification events sent from
+ // 'addTimeoutObject(..)' method.
+ SocketTimeoutObject first = timeoutObjs.firstx();
+
+ if (first != null) {
+ long waitTime = first.endTime() - U.currentTimeMillis();
+
+ if (waitTime > 0)
+ mux0.wait(waitTime);
+ else
+ break;
+ }
+ else
+ mux0.wait(5000);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Socket timeout object.
+ */
+ protected static class SocketTimeoutObject {
+ /** */
+ private static final AtomicLong idGen = new AtomicLong();
+
+ /** */
+ private final long id = idGen.incrementAndGet();
+
+ /** */
+ private final Socket sock;
+
+ /** */
+ private final long endTime;
+
+ /** */
+ private final AtomicBoolean done = new AtomicBoolean();
+
+ /**
+ * @param sock Socket.
+ * @param endTime End time.
+ */
+ SocketTimeoutObject(Socket sock, long endTime) {
+ assert sock != null;
+ assert endTime > 0;
+
+ this.sock = sock;
+ this.endTime = endTime;
+ }
+
+ /**
+ * @return {@code True} if object has not yet been processed.
+ */
+ boolean cancel() {
+ return done.compareAndSet(false, true);
+ }
+
+ /**
+ * @return {@code True} if object has not yet been canceled.
+ */
+ boolean onTimeout() {
+ if (done.compareAndSet(false, true)) {
+ // Close socket - timeout occurred.
+ U.closeQuiet(sock);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @return End time.
+ */
+ long endTime() {
+ return endTime;
+ }
+
+ /**
+ * @return ID.
+ */
+ long id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SocketTimeoutObject.class, this);
+ }
+ }
+
+ /**
+ * Base class for message workers.
+ */
+ protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
+ /** Pre-allocated output stream (100K). */
+ private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024);
+
+ /** Message queue. */
+ private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();
+
+ /** Backed interrupted flag. */
+ private volatile boolean interrupted;
+
+ /**
+ * @param name Thread name.
+ */
+ protected MessageWorkerAdapter(String name) {
+ super(gridName, name, log);
+
+ setPriority(threadPri);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ if (log.isDebugEnabled())
+ log.debug("Message worker started [locNodeId=" + locNodeId + ']');
+
+ while (!isInterrupted()) {
+ TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
+
+ if (msg == null)
+ continue;
+
+ processMessage(msg);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void interrupt() {
+ interrupted = true;
+
+ super.interrupt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isInterrupted() {
+ return interrupted || super.isInterrupted();
+ }
+
+ /**
+ * @return Current queue size.
+ */
+ int queueSize() {
+ return queue.size();
+ }
+
+ /**
+ * Adds message to queue.
+ *
+ * @param msg Message to add.
+ */
+ void addMessage(TcpDiscoveryAbstractMessage msg) {
+ assert msg != null;
+
+ if (msg instanceof TcpDiscoveryHeartbeatMessage)
+ queue.addFirst(msg);
+ else
+ queue.add(msg);
+
+ if (log.isDebugEnabled())
+ log.debug("Message has been added to queue: " + msg);
+ }
+
+ protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
+
+ /**
+ * @param sock Socket.
+ * @param msg Message.
+ * @throws IOException If IO failed.
+ * @throws GridException If marshalling failed.
+ */
+ protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+ throws IOException, GridException {
+ bout.reset();
+
+ TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
new file mode 100644
index 0000000..5043d78
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
@@ -0,0 +1,267 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.mbean.*;
+import org.apache.ignite.spi.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Management bean for {@link TcpDiscoverySpi}.
+ */
+public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean {
+ /**
+ * Gets delay between heartbeat messages sent by coordinator.
+ *
+ * @return Time period in milliseconds.
+ */
+ @IgniteMBeanDescription("Heartbeat frequency.")
+ public long getHeartbeatFrequency();
+
+ /**
+ * Gets current SPI state.
+ *
+ * @return Current SPI state.
+ */
+ @IgniteMBeanDescription("SPI state.")
+ public String getSpiState();
+
+ /**
+ * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation).
+ *
+ * @return IPFinder (string representation).
+ */
+ @IgniteMBeanDescription("IP Finder.")
+ public String getIpFinderFormatted();
+
+ /**
+ * Gets number of connection attempts.
+ *
+ * @return Number of connection attempts.
+ */
+ @IgniteMBeanDescription("Reconnect count.")
+ public int getReconnectCount();
+
+ /**
+ * Gets network timeout.
+ *
+ * @return Network timeout.
+ */
+ @IgniteMBeanDescription("Network timeout.")
+ public long getNetworkTimeout();
+
+ /**
+ * Gets local TCP port SPI listens to.
+ *
+ * @return Local port range.
+ */
+ @IgniteMBeanDescription("Local TCP port.")
+ public int getLocalPort();
+
+ /**
+ * Gets local TCP port range.
+ *
+ * @return Local port range.
+ */
+ @IgniteMBeanDescription("Local TCP port range.")
+ public int getLocalPortRange();
+
+ /**
+ * Gets max heartbeats count node can miss without initiating status check.
+ *
+ * @return Max missed heartbeats.
+ */
+ @IgniteMBeanDescription("Max missed heartbeats.")
+ public int getMaxMissedHeartbeats();
+
+ /**
+ * Gets max heartbeats count node can miss without failing client node.
+ *
+ * @return Max missed client heartbeats.
+ */
+ @IgniteMBeanDescription("Max missed client heartbeats.")
+ public int getMaxMissedClientHeartbeats();
+
+ /**
+ * Gets thread priority. All threads within SPI will be started with it.
+ *
+ * @return Thread priority.
+ */
+ @IgniteMBeanDescription("Threads priority.")
+ public int getThreadPriority();
+
+ /**
+ * Gets IP finder clean frequency.
+ *
+ * @return IP finder clean frequency.
+ */
+ @IgniteMBeanDescription("IP finder clean frequency.")
+ public long getIpFinderCleanFrequency();
+
+ /**
+ * Gets statistics print frequency.
+ *
+ * @return Statistics print frequency in milliseconds.
+ */
+ @IgniteMBeanDescription("Statistics print frequency.")
+ public long getStatisticsPrintFrequency();
+
+ /**
+ * Gets message worker queue current size.
+ *
+ * @return Message worker queue current size.
+ */
+ @IgniteMBeanDescription("Message worker queue current size.")
+ public int getMessageWorkerQueueSize();
+
+ /**
+ * Gets joined nodes count.
+ *
+ * @return Nodes joined count.
+ */
+ @IgniteMBeanDescription("Nodes joined count.")
+ public long getNodesJoined();
+
+ /**
+ * Gets left nodes count.
+ *
+ * @return Left nodes count.
+ */
+ @IgniteMBeanDescription("Nodes left count.")
+ public long getNodesLeft();
+
+ /**
+ * Gets failed nodes count.
+ *
+ * @return Failed nodes count.
+ */
+ @IgniteMBeanDescription("Nodes failed count.")
+ public long getNodesFailed();
+
+ /**
+ * Gets pending messages registered count.
+ *
+ * @return Pending messages registered count.
+ */
+ @IgniteMBeanDescription("Pending messages registered.")
+ public long getPendingMessagesRegistered();
+
+ /**
+ * Gets pending messages discarded count.
+ *
+ * @return Pending messages registered count.
+ */
+ @IgniteMBeanDescription("Pending messages discarded.")
+ public long getPendingMessagesDiscarded();
+
+ /**
+ * Gets avg message processing time.
+ *
+ * @return Avg message processing time.
+ */
+ @IgniteMBeanDescription("Avg message processing time.")
+ public long getAvgMessageProcessingTime();
+
+ /**
+ * Gets max message processing time.
+ *
+ * @return Max message processing time.
+ */
+ @IgniteMBeanDescription("Max message processing time.")
+ public long getMaxMessageProcessingTime();
+
+ /**
+ * Gets total received messages count.
+ *
+ * @return Total received messages count.
+ */
+ @IgniteMBeanDescription("Total received messages count.")
+ public int getTotalReceivedMessages();
+
+ /**
+ * Gets received messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ @IgniteMBeanDescription("Received messages by type.")
+ public Map<String, Integer> getReceivedMessages();
+
+ /**
+ * Gets total processed messages count.
+ *
+ * @return Total processed messages count.
+ */
+ @IgniteMBeanDescription("Total processed messages count.")
+ public int getTotalProcessedMessages();
+
+ /**
+ * Gets processed messages counts (grouped by type).
+ *
+ * @return Map containing message types and respective counts.
+ */
+ @IgniteMBeanDescription("Received messages by type.")
+ public Map<String, Integer> getProcessedMessages();
+
+ /**
+ * Gets time local node has been coordinator since.
+ *
+ * @return Time local node is coordinator since.
+ */
+ @IgniteMBeanDescription("Local node is coordinator since.")
+ public long getCoordinatorSinceTimestamp();
+
+ /**
+ * Gets current coordinator.
+ *
+ * @return Gets current coordinator.
+ */
+ @IgniteMBeanDescription("Coordinator node ID.")
+ @Nullable public UUID getCoordinator();
+
+ /**
+ * Gets message acknowledgement timeout.
+ *
+ * @return Message acknowledgement timeout.
+ */
+ @IgniteMBeanDescription("Message acknowledgement timeout.")
+ public long getAckTimeout();
+
+ /**
+ * Gets maximum message acknowledgement timeout.
+ *
+ * @return Maximum message acknowledgement timeout.
+ */
+ @IgniteMBeanDescription("Maximum message acknowledgement timeout.")
+ public long getMaxAckTimeout();
+
+ /**
+ * Gets socket timeout.
+ *
+ * @return Socket timeout.
+ */
+ @IgniteMBeanDescription("Socket timeout.")
+ public long getSocketTimeout();
+
+ /**
+ * Gets join timeout.
+ *
+ * @return Join timeout.
+ */
+ @IgniteMBeanDescription("Join timeout.")
+ public long getJoinTimeout();
+
+ /**
+ * Dumps debug info using configured logger.
+ */
+ @IgniteMBeanDescription("Dump debug info.")
+ public void dumpDebugInfo();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
new file mode 100644
index 0000000..5658545
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -0,0 +1,443 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp.internal;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.product.*;
+import org.gridgain.grid.kernal.*;
+import org.apache.ignite.spi.discovery.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import static org.gridgain.grid.kernal.GridNodeAttributes.*;
+
+/**
+ * Node for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}.
+ * <p>
+ * <strong>This class is not intended for public use</strong> and has been made
+ * <tt>public</tt> due to certain limitations of Java technology.
+ */
+public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements ClusterNode,
+ Comparable<TcpDiscoveryNode>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Node ID. */
+ private UUID id;
+
+ /** Consistent ID. */
+ private Object consistentId;
+
+ /** Node attributes. */
+ @GridToStringExclude
+ private Map<String, Object> attrs;
+
+ /** Internal discovery addresses as strings. */
+ @GridToStringInclude
+ private Collection<String> addrs;
+
+ /** Internal discovery host names as strings. */
+ private Collection<String> hostNames;
+
+ /** */
+ @GridToStringInclude
+ private Collection<InetSocketAddress> sockAddrs;
+
+ /** */
+ @GridToStringInclude
+ private int discPort;
+
+ /** Node metrics. */
+ @GridToStringExclude
+ private volatile ClusterNodeMetrics metrics;
+
+ /** Node order in the topology. */
+ private volatile long order;
+
+ /** Node order in the topology (internal). */
+ private volatile long intOrder;
+
+ /** The most recent time when heartbeat message was received from the node. */
+ @GridToStringExclude
+ private volatile long lastUpdateTime = U.currentTimeMillis();
+
+ /** Metrics provider (transient). */
+ @GridToStringExclude
+ private DiscoveryMetricsProvider metricsProvider;
+
+ /** Visible flag (transient). */
+ @GridToStringExclude
+ private boolean visible;
+
+ /** Grid local node flag (transient). */
+ private boolean loc;
+
+ /** Version. */
+ private IgniteProductVersion ver;
+
+ /** Alive check (used by clients). */
+ @GridToStringExclude
+ private transient int aliveCheck;
+
+ /** Client router node ID. */
+ @GridToStringExclude
+ private UUID clientRouterNodeId;
+
+ /**
+ * Public default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryNode() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param id Node Id.
+ * @param addrs Addresses.
+ * @param hostNames Host names.
+ * @param discPort Port.
+ * @param metricsProvider Metrics provider.
+ * @param ver Version.
+ */
+ public TcpDiscoveryNode(UUID id, Collection<String> addrs, Collection<String> hostNames, int discPort,
+ DiscoveryMetricsProvider metricsProvider, IgniteProductVersion ver) {
+ assert id != null;
+ assert !F.isEmpty(addrs);
+ assert metricsProvider != null;
+ assert ver != null;
+
+ this.id = id;
+ this.addrs = addrs;
+ this.hostNames = hostNames;
+ this.discPort = discPort;
+ this.metricsProvider = metricsProvider;
+ this.ver = ver;
+
+ consistentId = U.consistentId(addrs, discPort);
+
+ metrics = metricsProvider.getMetrics();
+ sockAddrs = U.toSocketAddresses(this, discPort);
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object consistentId() {
+ return consistentId;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T> T attribute(String name) {
+ // Even though discovery SPI removes this attribute after authentication, keep this check for safety.
+ if (GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name))
+ return null;
+
+ return (T)attrs.get(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Object> attributes() {
+ // Even though discovery SPI removes this attribute after authentication, keep this check for safety.
+ return F.view(attrs, new IgnitePredicate<String>() {
+ @Override public boolean apply(String s) {
+ return !GridNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s);
+ }
+ });
+ }
+
+ /**
+ * Sets node attributes.
+ *
+ * @param attrs Node attributes.
+ */
+ public void setAttributes(Map<String, Object> attrs) {
+ this.attrs = U.sealMap(attrs);
+ }
+
+ /**
+ * Gets node attributes without filtering.
+ *
+ * @return Node attributes without filtering.
+ */
+ public Map<String, Object> getAttributes() {
+ return attrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNodeMetrics metrics() {
+ if (metricsProvider != null)
+ metrics = metricsProvider.getMetrics();
+
+ return metrics;
+ }
+
+ /**
+ * Sets node metrics.
+ *
+ * @param metrics Node metrics.
+ */
+ public void setMetrics(ClusterNodeMetrics metrics) {
+ assert metrics != null;
+
+ this.metrics = metrics;
+ }
+
+ /**
+ * @return Internal order.
+ */
+ public long internalOrder() {
+ return intOrder;
+ }
+
+ /**
+ * @param intOrder Internal order of the node.
+ */
+ public void internalOrder(long intOrder) {
+ assert intOrder > 0;
+
+ this.intOrder = intOrder;
+ }
+
+ /**
+ * @return Order.
+ */
+ @Override public long order() {
+ return order;
+ }
+
+ /**
+ * @param order Order of the node.
+ */
+ public void order(long order) {
+ assert order >= 0 : "Order is invalid: " + this;
+
+ this.order = order;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteProductVersion version() {
+ return ver;
+ }
+
+ /**
+ * @param ver Version.
+ */
+ public void version(IgniteProductVersion ver) {
+ assert ver != null;
+
+ this.ver = ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> addresses() {
+ return addrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLocal() {
+ return loc;
+ }
+
+ /**
+ * @param loc Grid local node flag.
+ */
+ public void local(boolean loc) {
+ this.loc = loc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDaemon() {
+ return "true".equalsIgnoreCase((String)attribute(ATTR_DAEMON));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> hostNames() {
+ return hostNames;
+ }
+
+ /**
+ * @return Discovery port.
+ */
+ public int discoveryPort() {
+ return discPort;
+ }
+
+ /**
+ * @return Addresses that could be used by discovery.
+ */
+ public Collection<InetSocketAddress> socketAddresses() {
+ return sockAddrs;
+ }
+
+ /**
+ * Gets node last update time.
+ *
+ * @return Time of the last heartbeat.
+ */
+ public long lastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ /**
+ * Sets node last update.
+ *
+ * @param lastUpdateTime Time of last metrics update.
+ */
+ public void lastUpdateTime(long lastUpdateTime) {
+ assert lastUpdateTime > 0;
+
+ this.lastUpdateTime = lastUpdateTime;
+ }
+
+ /**
+ * Gets visible flag.
+ *
+ * @return {@code true} if node is in visible state.
+ */
+ public boolean visible() {
+ return visible;
+ }
+
+ /**
+ * Sets visible flag.
+ *
+ * @param visible {@code true} if node is in visible state.
+ */
+ public void visible(boolean visible) {
+ this.visible = visible;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClient() {
+ return clientRouterNodeId != null;
+ }
+
+ /**
+ * Decrements alive check value and returns new one.
+ *
+ * @return Alive check value.
+ */
+ public int decrementAliveCheck() {
+ assert isClient();
+
+ return --aliveCheck;
+ }
+
+ /**
+ * @param aliveCheck Alive check value.
+ */
+ public void aliveCheck(int aliveCheck) {
+ assert isClient();
+
+ this.aliveCheck = aliveCheck;
+ }
+
+ /**
+ * @return Client router node ID.
+ */
+ public UUID clientRouterNodeId() {
+ return clientRouterNodeId;
+ }
+
+ /**
+ * @param clientRouterNodeId Client router node ID.
+ */
+ public void clientRouterNodeId(UUID clientRouterNodeId) {
+ this.clientRouterNodeId = clientRouterNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(@Nullable TcpDiscoveryNode node) {
+ if (node == null)
+ return 1;
+
+ if (internalOrder() == node.internalOrder())
+ assert id().equals(node.id()) : "Duplicate order [this=" + this + ", other=" + node + ']';
+
+ return internalOrder() < node.internalOrder() ? -1 : internalOrder() > node.internalOrder() ? 1 :
+ id().compareTo(node.id());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeUuid(out, id);
+ U.writeMap(out, attrs);
+ U.writeCollection(out, addrs);
+ U.writeCollection(out, hostNames);
+ out.writeInt(discPort);
+
+ byte[] mtr = null;
+
+ if (metrics != null) {
+ mtr = new byte[DiscoveryMetricsHelper.METRICS_SIZE];
+
+ DiscoveryMetricsHelper.serialize(mtr, 0, metrics);
+ }
+
+ U.writeByteArray(out, mtr);
+
+ out.writeLong(order);
+ out.writeLong(intOrder);
+ out.writeObject(ver);
+ U.writeUuid(out, clientRouterNodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ id = U.readUuid(in);
+
+ attrs = U.sealMap(U.<String, Object>readMap(in));
+ addrs = U.readCollection(in);
+ hostNames = U.readCollection(in);
+ discPort = in.readInt();
+
+ sockAddrs = U.toSocketAddresses(this, discPort);
+
+ consistentId = U.consistentId(addrs, discPort);
+
+ byte[] mtr = U.readByteArray(in);
+
+ if (mtr != null)
+ metrics = DiscoveryMetricsHelper.deserialize(mtr, 0);
+
+ order = in.readLong();
+ intOrder = in.readLong();
+ ver = (IgniteProductVersion)in.readObject();
+ clientRouterNodeId = U.readUuid(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return F.eqNodes(this, o);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryNode.class, this, "isClient", isClient());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
new file mode 100644
index 0000000..9c57f4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -0,0 +1,636 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp.internal;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Convenient way to represent topology for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
+ */
+public class TcpDiscoveryNodesRing {
+ /** Visible nodes filter. */
+ private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
+ @Override public boolean apply(TcpDiscoveryNode node) {
+ return node.visible();
+ }
+ };
+
+ /** Client nodes filter. */
+ private static final PN CLIENT_NODES = new PN() {
+ @Override public boolean apply(ClusterNode node) {
+ return node.isClient();
+ }
+ };
+
+ /** Local node. */
+ private TcpDiscoveryNode locNode;
+
+ /** All nodes in topology. */
+ @GridToStringInclude
+ private NavigableSet<TcpDiscoveryNode> nodes = new TreeSet<>();
+
+ /** All started nodes. */
+ @GridToStringExclude
+ private Map<UUID, TcpDiscoveryNode> nodesMap = new HashMap<>();
+
+ /** Current topology version */
+ private long topVer;
+
+ /** */
+ private long nodeOrder;
+
+ /** Lock. */
+ @GridToStringExclude
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ /**
+ * Sets local node.
+ *
+ * @param locNode Local node.
+ */
+ public void localNode(TcpDiscoveryNode locNode) {
+ assert locNode != null;
+
+ rwLock.writeLock().lock();
+
+ try {
+ this.locNode = locNode;
+
+ clear();
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Gets all nodes in the topology.
+ *
+ * @return Collection of all nodes.
+ */
+ public Collection<TcpDiscoveryNode> allNodes() {
+ return nodes();
+ }
+
+ /**
+ * Gets visible nodes in the topology.
+ *
+ * @return Collection of visible nodes.
+ */
+ public Collection<TcpDiscoveryNode> visibleNodes() {
+ return nodes(VISIBLE_NODES);
+ }
+
+ /**
+ * Gets remote nodes.
+ *
+ * @return Collection of remote nodes in grid.
+ */
+ public Collection<TcpDiscoveryNode> remoteNodes() {
+ return nodes(F.remoteNodes(locNode.id()));
+ }
+
+ /**
+ * Gets visible remote nodes in the topology.
+ *
+ * @return Collection of visible remote nodes.
+ */
+ public Collection<TcpDiscoveryNode> visibleRemoteNodes() {
+ return nodes(VISIBLE_NODES, F.remoteNodes(locNode.id()));
+ }
+
+ /**
+ * @return Client nodes.
+ */
+ public Collection<TcpDiscoveryNode> clientNodes() {
+ return nodes(CLIENT_NODES);
+ }
+
+ /**
+ * Checks whether the topology has remote nodes in.
+ *
+ * @return {@code true} if the topology has remote nodes in.
+ */
+ public boolean hasRemoteNodes() {
+ rwLock.readLock().lock();
+
+ try {
+ return nodes.size() > 1;
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Adds node to topology, also initializes node last update time with current
+ * system time.
+ *
+ * @param node Node to add.
+ * @return {@code true} if such node was added and did not present previously in the topology.
+ */
+ public boolean add(TcpDiscoveryNode node) {
+ assert node != null;
+ assert node.internalOrder() > 0;
+
+ rwLock.writeLock().lock();
+
+ try {
+ if (nodesMap.containsKey(node.id()))
+ return false;
+
+ assert node.internalOrder() > maxInternalOrder() : "Adding node to the middle of the ring " +
+ "[ring=" + this + ", node=" + node + ']';
+
+ nodesMap.put(node.id(), node);
+
+ nodes = new TreeSet<>(nodes);
+
+ node.lastUpdateTime(U.currentTimeMillis());
+
+ nodes.add(node);
+
+ nodeOrder = node.internalOrder();
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+
+ return true;
+ }
+
+ /**
+ * @return Max internal order.
+ */
+ public long maxInternalOrder() {
+ rwLock.readLock().lock();
+
+ try {
+ TcpDiscoveryNode last = nodes.last();
+
+ return last != null ? last.internalOrder() : -1;
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Restores topology from parameters values.
+ * <p>
+ * This method is called when new node receives topology from coordinator.
+ * In this case all nodes received are remote for local.
+ * <p>
+ * Also initializes nodes last update time with current system time.
+ *
+ * @param nodes List of remote nodes.
+ * @param topVer Topology version.
+ */
+ public void restoreTopology(Iterable<TcpDiscoveryNode> nodes, long topVer) {
+ assert !F.isEmpty(nodes);
+ assert topVer > 0;
+
+ rwLock.writeLock().lock();
+
+ try {
+ locNode.internalOrder(topVer);
+
+ clear();
+
+ boolean firstAdd = true;
+
+ for (TcpDiscoveryNode node : nodes) {
+ if (nodesMap.containsKey(node.id()))
+ continue;
+
+ nodesMap.put(node.id(), node);
+
+ if (firstAdd) {
+ this.nodes = new TreeSet<>(this.nodes);
+
+ firstAdd = false;
+ }
+
+ node.lastUpdateTime(U.currentTimeMillis());
+
+ this.nodes.add(node);
+ }
+
+ nodeOrder = topVer;
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Finds node by ID.
+ *
+ * @param nodeId Node id to find.
+ * @return Node with ID provided or {@code null} if not found.
+ */
+ @Nullable public TcpDiscoveryNode node(UUID nodeId) {
+ assert nodeId != null;
+
+ rwLock.readLock().lock();
+
+ try {
+ return nodesMap.get(nodeId);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Removes node from the topology.
+ *
+ * @param nodeId ID of the node to remove.
+ * @return {@code true} if node was removed.
+ */
+ @Nullable public TcpDiscoveryNode removeNode(UUID nodeId) {
+ assert nodeId != null;
+ assert !locNode.id().equals(nodeId);
+
+ rwLock.writeLock().lock();
+
+ try {
+ TcpDiscoveryNode rmv = nodesMap.remove(nodeId);
+
+ if (rmv != null) {
+ nodes = new TreeSet<>(nodes);
+
+ nodes.remove(rmv);
+ }
+
+ return rmv;
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Removes nodes from the topology.
+ *
+ * @param nodeIds IDs of the nodes to remove.
+ * @return Collection of removed nodes.
+ */
+ public Collection<TcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) {
+ assert !F.isEmpty(nodeIds);
+
+ rwLock.writeLock().lock();
+
+ try {
+ boolean firstRmv = true;
+
+ Collection<TcpDiscoveryNode> res = null;
+
+ for (UUID id : nodeIds) {
+ TcpDiscoveryNode rmv = nodesMap.remove(id);
+
+ if (rmv != null) {
+ if (firstRmv) {
+ nodes = new TreeSet<>(nodes);
+
+ res = new ArrayList<>(nodeIds.size());
+
+ firstRmv = false;
+ }
+
+ nodes.remove(rmv);
+
+ res.add(rmv);
+ }
+ }
+
+ return res == null ? Collections.<TcpDiscoveryNode>emptyList() : res;
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Removes all remote nodes, leaves only local node.
+ * <p>
+ * This should be called when SPI should be disconnected from topology and
+ * reconnected back after.
+ */
+ public void clear() {
+ rwLock.writeLock().lock();
+
+ try {
+ nodes = new TreeSet<>();
+
+ if (locNode != null)
+ nodes.add(locNode);
+
+ nodesMap = new HashMap<>();
+
+ if (locNode != null)
+ nodesMap.put(locNode.id(), locNode);
+
+ nodeOrder = 0;
+
+ topVer = 0;
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Finds coordinator in the topology.
+ *
+ * @return Coordinator node that gives versions to topology (node with the smallest order).
+ */
+ @Nullable public TcpDiscoveryNode coordinator() {
+ rwLock.readLock().lock();
+
+ try {
+ if (F.isEmpty(nodes))
+ return null;
+
+ return coordinator(null);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Finds coordinator in the topology filtering excluded nodes from the search.
+ * <p>
+ * This may be used when handling current coordinator leave or failure.
+ *
+ * @param excluded Nodes to exclude from the search (optional).
+ * @return Coordinator node among remaining nodes or {@code null} if all nodes are excluded.
+ */
+ @Nullable public TcpDiscoveryNode coordinator(@Nullable Collection<TcpDiscoveryNode> excluded) {
+ rwLock.readLock().lock();
+
+ try {
+ Collection<TcpDiscoveryNode> filtered = serverNodes(excluded);
+
+ if (F.isEmpty(filtered))
+ return null;
+
+ return Collections.min(filtered);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Finds next node in the topology.
+ *
+ * @return Next node.
+ */
+ @Nullable public TcpDiscoveryNode nextNode() {
+ rwLock.readLock().lock();
+
+ try {
+ if (nodes.size() < 2)
+ return null;
+
+ return nextNode(null);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Finds next node in the topology filtering excluded nodes from search.
+ * <p>
+ * This may be used when detecting and handling nodes failure.
+ *
+ * @param excluded Nodes to exclude from the search (optional). If provided,
+ * cannot contain local node.
+ * @return Next node or {@code null} if all nodes were filtered out or
+ * topology contains less than two nodes.
+ */
+ @Nullable public TcpDiscoveryNode nextNode(@Nullable Collection<TcpDiscoveryNode> excluded) {
+ assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode);
+
+ rwLock.readLock().lock();
+
+ try {
+ Collection<TcpDiscoveryNode> filtered = serverNodes(excluded);
+
+ if (filtered.size() < 2)
+ return null;
+
+ Iterator<TcpDiscoveryNode> iter = filtered.iterator();
+
+ while (iter.hasNext()) {
+ TcpDiscoveryNode node = iter.next();
+
+ if (locNode.equals(node))
+ break;
+ }
+
+ return iter.hasNext() ? iter.next() : F.first(filtered);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Finds previous node in the topology.
+ *
+ * @return Previous node.
+ */
+ @Nullable public TcpDiscoveryNode previousNode() {
+ rwLock.readLock().lock();
+
+ try {
+ if (nodes.size() < 2)
+ return null;
+
+ return previousNode(null);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Finds previous node in the topology filtering excluded nodes from search.
+ *
+ * @param excluded Nodes to exclude from the search (optional). If provided,
+ * cannot contain local node.
+ * @return Previous node or {@code null} if all nodes were filtered out or
+ * topology contains less than two nodes.
+ */
+ @Nullable public TcpDiscoveryNode previousNode(@Nullable Collection<TcpDiscoveryNode> excluded) {
+ assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode);
+
+ rwLock.readLock().lock();
+
+ try {
+ Collection<TcpDiscoveryNode> filtered = serverNodes(excluded);
+
+ if (filtered.size() < 2)
+ return null;
+
+ Iterator<TcpDiscoveryNode> iter = filtered.iterator();
+
+ while (iter.hasNext()) {
+ TcpDiscoveryNode node = iter.next();
+
+ if (locNode.equals(node))
+ break;
+ }
+
+ return iter.hasNext() ? iter.next() : F.first(filtered);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Gets current topology version.
+ *
+ * @return Current topology version.
+ */
+ public long topologyVersion() {
+ rwLock.readLock().lock();
+
+ try {
+ return topVer;
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Sets new topology version.
+ *
+ * @param topVer New topology version (should be greater than current, otherwise no-op).
+ * @return {@code True} if topology has been changed.
+ */
+ public boolean topologyVersion(long topVer) {
+ rwLock.writeLock().lock();
+
+ try {
+ if (this.topVer < topVer) {
+ this.topVer = topVer;
+
+ return true;
+ }
+
+ U.debug("KARAMBA [old=" + this.topVer + ", new=" + topVer + ']');
+
+ return false;
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Increments topology version and gets new value.
+ *
+ * @return Topology version (incremented).
+ */
+ public long incrementTopologyVersion() {
+ rwLock.writeLock().lock();
+
+ try {
+ return ++topVer;
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Increments topology version and gets new value.
+ *
+ * @return Topology version (incremented).
+ */
+ public long nextNodeOrder() {
+ rwLock.writeLock().lock();
+
+ try {
+ if (nodeOrder == 0) {
+ TcpDiscoveryNode last = nodes.last();
+
+ assert last != null;
+
+ nodeOrder = last.internalOrder();
+ }
+
+ return ++nodeOrder;
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * @param p Filters.
+ * @return Unmodifiable collection of nodes.
+ */
+ private Collection<TcpDiscoveryNode> nodes(IgnitePredicate<? super TcpDiscoveryNode>... p) {
+ rwLock.readLock().lock();
+
+ try {
+ List<TcpDiscoveryNode> list = U.arrayList(nodes, p);
+
+ return Collections.unmodifiableCollection(list);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Gets server nodes from topology.
+ *
+ * @param excluded Nodes to exclude from the search (optional).
+ * @return Collection of server nodes.
+ */
+ private Collection<TcpDiscoveryNode> serverNodes(@Nullable final Collection<TcpDiscoveryNode> excluded) {
+ final boolean excludedEmpty = F.isEmpty(excluded);
+
+ return F.view(nodes, new P1<TcpDiscoveryNode>() {
+ @Override public boolean apply(TcpDiscoveryNode node) {
+ return !node.isClient() && (excludedEmpty || !excluded.contains(node));
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ rwLock.readLock().lock();
+
+ try {
+ return S.toString(TcpDiscoveryNodesRing.class, this);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java
new file mode 100644
index 0000000..693ec41
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoverySpiState.java
@@ -0,0 +1,45 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.spi.discovery.tcp.internal;
+
+/**
+ * State of local node {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}.
+ */
+public enum TcpDiscoverySpiState {
+ /** */
+ DISCONNECTED,
+
+ /** */
+ CONNECTING,
+
+ /** */
+ CONNECTED,
+
+ /** */
+ DISCONNECTING,
+
+ /** */
+ STOPPING,
+
+ /** */
+ LEFT,
+
+ /** */
+ DUPLICATE_ID,
+
+ /** */
+ AUTH_FAILED,
+
+ /** */
+ CHECK_FAILED,
+
+ /** */
+ LOOPBACK_PROBLEM
+}