You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 15:07:32 UTC
[31/52] [abbrv] incubator-ignite git commit: # Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ef8f69b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java
deleted file mode 100644
index 40e4418..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/TcpDiscoverySpi.java
+++ /dev/null
@@ -1,5144 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.spi.discovery.tcp;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.*;
-import org.gridgain.grid.kernal.managers.security.*;
-import org.gridgain.grid.security.*;
-import org.gridgain.grid.spi.discovery.*;
-import org.gridgain.grid.spi.discovery.tcp.internal.*;
-import org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.*;
-import org.gridgain.grid.spi.discovery.tcp.messages.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.future.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.tostring.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.text.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.events.IgniteEventType.*;
-import static org.gridgain.grid.kernal.GridNodeAttributes.*;
-import static org.apache.ignite.spi.IgnitePortProtocol.*;
-import static org.gridgain.grid.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
-import static org.gridgain.grid.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*;
-import static org.gridgain.grid.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*;
-
-/**
- * Discovery SPI implementation that uses TCP/IP for node discovery.
- * <p>
- * Nodes are organized in ring. So almost all network exchange (except few cases) is
- * done across it.
- * <p>
- * At startup SPI tries to send messages to random IP taken from
- * {@link org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} about self start (stops when send succeeds)
- * and then this info goes to coordinator. When coordinator processes join request
- * and issues node added messages and all other nodes then receive info about new node.
- * <h1 class="header">Configuration</h1>
- * <h2 class="header">Mandatory</h2>
- * There are no mandatory configuration parameters.
- * <h2 class="header">Optional</h2>
- * The following configuration parameters are optional:
- * <ul>
- * <li>IP finder to share info about nodes IP addresses
- * (see {@link #setIpFinder(org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder)}).
- * See the following IP finder implementations for details on configuration:
- * <ul>
- * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder}</li>
- * <li>{@gglink org.gridgain.grid.spi.discovery.tcp.ipfinder.s3.GridTcpDiscoveryS3IpFinder}</li>
- * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder}</li>
- * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder}</li>
- * <li>{@link org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} - default</li>
- * </ul>
- * </li>
- * </ul>
- * <ul>
- * </li>
- * <li>Local address (see {@link #setLocalAddress(String)})</li>
- * <li>Local port to bind to (see {@link #setLocalPort(int)})</li>
- * <li>Local port range to try binding to if previous ports are in use
- * (see {@link #setLocalPortRange(int)})</li>
- * <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)})</li>
- * <li>Max missed heartbeats (see {@link #setMaxMissedHeartbeats(int)})</li>
- * <li>Number of times node tries to (re)establish connection to another node
- * (see {@link #setReconnectCount(int)})</li>
- * <li>Network timeout (see {@link #setNetworkTimeout(long)})</li>
- * <li>Socket timeout (see {@link #setSocketTimeout(long)})</li>
- * <li>Message acknowledgement timeout (see {@link #setAckTimeout(long)})</li>
- * <li>Maximum message acknowledgement timeout (see {@link #setMaxAckTimeout(long)})</li>
- * <li>Join timeout (see {@link #setJoinTimeout(long)})</li>
- * <li>Thread priority for threads started by SPI (see {@link #setThreadPriority(int)})</li>
- * <li>IP finder clean frequency (see {@link #setIpFinderCleanFrequency(long)})</li>
- * <li>Statistics print frequency (see {@link #setStatisticsPrintFrequency(long)}</li>
- * </ul>
- * <h2 class="header">Java Example</h2>
- * <pre name="code" class="java">
- * GridTcpDiscoverySpi spi = new GridTcpDiscoverySpi();
- *
- * GridTcpDiscoveryVmIpFinder finder =
- * new GridTcpDiscoveryVmIpFinder();
- *
- * spi.setIpFinder(finder);
- *
- * GridConfiguration cfg = new GridConfiguration();
- *
- * // Override default discovery SPI.
- * cfg.setDiscoverySpi(spi);
- *
- * // Start grid.
- * GridGain.start(cfg);
- * </pre>
- * <h2 class="header">Spring Example</h2>
- * GridTcpDiscoverySpi can be configured from Spring XML configuration file:
- * <pre name="code" class="xml">
- * <bean id="grid.custom.cfg" class="org.gridgain.grid.GridConfiguration" singleton="true">
- * ...
- * <property name="discoverySpi">
- * <bean class="org.gridgain.grid.spi.discovery.tcp.GridTcpDiscoverySpi">
- * <property name="ipFinder">
- * <bean class="org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.GridTcpDiscoveryVmIpFinder" />
- * </property>
- * </bean>
- * </property>
- * ...
- * </bean>
- * </pre>
- * <p>
- * <img src="http://www.gridgain.com/images/spring-small.png">
- * <br>
- * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
- * @see org.gridgain.grid.spi.discovery.DiscoverySpi
- */
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-@IgniteSpiMultipleInstancesSupport(true)
-@DiscoverySpiOrderSupport(true)
-@DiscoverySpiHistorySupport(true)
-public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscoverySpiMBean {
- /** Default local port range (value is <tt>100</tt>). */
- public static final int DFLT_PORT_RANGE = 100;
-
- /** Default timeout for joining topology (value is <tt>0</tt>). */
- public static final long DFLT_JOIN_TIMEOUT = 0;
-
- /** Default reconnect attempts count (value is <tt>10</tt>). */
- public static final int DFLT_RECONNECT_CNT = 10;
-
- /** Default max heartbeats count node can miss without initiating status check (value is <tt>1</tt>). */
- public static final int DFLT_MAX_MISSED_HEARTBEATS = 1;
-
- /** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */
- public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5;
-
- /** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */
- public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000;
-
- /** Default statistics print frequency in milliseconds (value is <tt>0ms</tt>). */
- public static final long DFLT_STATS_PRINT_FREQ = 0;
-
- /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */
- public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
-
- /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
- public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
-
- /** Address resolver. */
- private IgniteAddressResolver addrRslvr;
-
- /** Local port which node uses. */
- private int locPort = DFLT_PORT;
-
- /** Local port range. */
- private int locPortRange = DFLT_PORT_RANGE;
-
- /** Statistics print frequency. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"})
- private long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
-
- /** Maximum message acknowledgement timeout. */
- private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
-
- /** Join timeout. */
- @SuppressWarnings("RedundantFieldInitialization")
- private long joinTimeout = DFLT_JOIN_TIMEOUT;
-
- /** Max heartbeats count node can miss without initiating status check. */
- private int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
-
- /** Max heartbeats count node can miss without failing client node. */
- private int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
-
- /** IP finder clean frequency. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ;
-
- /** Reconnect attempts count. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private int reconCnt = DFLT_RECONNECT_CNT;
-
- /** Grid marshaller. */
- @IgniteMarshallerResource
- private IgniteMarshaller gridMarsh;
-
- /** Nodes ring. */
- @GridToStringExclude
- private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
-
- /** Topology snapshots history. */
- private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
-
- /** Socket readers. */
- private final Collection<SocketReader> readers = new LinkedList<>();
-
- /** TCP server for discovery SPI. */
- private TcpServer tcpSrvr;
-
- /** Message worker. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private RingMessageWorker msgWorker;
-
- /** Client message workers. */
- private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
-
- /** Metrics sender. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private HeartbeatsSender hbsSnd;
-
- /** Status checker. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private CheckStatusSender chkStatusSnd;
-
- /** IP finder cleaner. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private IpFinderCleaner ipFinderCleaner;
-
- /** Statistics printer thread. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private StatisticsPrinter statsPrinter;
-
- /** Failed nodes (but still in topology). */
- private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
-
- /** Leaving nodes (but still in topology). */
- private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
-
- /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
- private boolean ipFinderHasLocAddr;
-
- /** Addresses that do not respond during join requests send (for resolving concurrent start). */
- private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>();
-
- /** Addresses that incoming join requests send were send from (for resolving concurrent start). */
- private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>();
-
- /** Response on join request from coordinator (in case of duplicate ID or auth failure). */
- private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1();
-
- /** Context initialization latch. */
- @GridToStringExclude
- private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
-
- /** Node authenticator. */
- private DiscoverySpiNodeAuthenticator nodeAuth;
-
- /** Mutex. */
- private final Object mux = new Object();
-
- /** Map with proceeding ping requests. */
- private final ConcurrentMap<InetSocketAddress, IgniteFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
- new ConcurrentHashMap8<>();
-
- /** Debug mode. */
- private boolean debugMode;
-
- /** Debug messages history. */
- private int debugMsgHist = 512;
-
- /** Received messages. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private ConcurrentLinkedDeque<String> debugLog;
-
- /**
- * Sets address resolver.
- *
- * @param addrRslvr Address resolver.
- */
- @IgniteSpiConfiguration(optional = true)
- @IgniteAddressResolverResource
- public void setAddressResolver(IgniteAddressResolver addrRslvr) {
- // Injection should not override value already set by Spring or user.
- if (this.addrRslvr == null)
- this.addrRslvr = addrRslvr;
- }
-
- /**
- * Gets address resolver.
- *
- * @return Address resolver.
- */
- public IgniteAddressResolver getAddressResolver() {
- return addrRslvr;
- }
-
- /** {@inheritDoc} */
- @Override public int getReconnectCount() {
- return reconCnt;
- }
-
- /**
- * Number of times node tries to (re)establish connection to another node.
- * <p>
- * Note that SPI implementation will increase {@link #ackTimeout} by factor 2
- * on every retry.
- * <p>
- * If not specified, default is {@link #DFLT_RECONNECT_CNT}.
- *
- * @param reconCnt Number of retries during message sending.
- * @see #setAckTimeout(long)
- */
- @IgniteSpiConfiguration(optional = true)
- public void setReconnectCount(int reconCnt) {
- this.reconCnt = reconCnt;
- }
-
- /** {@inheritDoc} */
- @Override public long getMaxAckTimeout() {
- return maxAckTimeout;
- }
-
- /**
- * Sets maximum timeout for receiving acknowledgement for sent message.
- * <p>
- * If acknowledgement is not received within this timeout, sending is considered as failed
- * and SPI tries to repeat message sending. Every time SPI retries messing sending, ack
- * timeout will be increased. If no acknowledgement is received and {@code maxAckTimeout}
- * is reached, then the process of message sending is considered as failed.
- * <p>
- * If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}.
- *
- * @param maxAckTimeout Maximum acknowledgement timeout.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setMaxAckTimeout(long maxAckTimeout) {
- this.maxAckTimeout = maxAckTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public long getJoinTimeout() {
- return joinTimeout;
- }
-
- /**
- * Sets join timeout.
- * <p>
- * If non-shared IP finder is used and node fails to connect to
- * any address from IP finder, node keeps trying to join within this
- * timeout. If all addresses are still unresponsive, exception is thrown
- * and node startup fails.
- * <p>
- * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}.
- *
- * @param joinTimeout Join timeout ({@code 0} means wait forever).
- *
- * @see org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()
- */
- @IgniteSpiConfiguration(optional = true)
- public void setJoinTimeout(long joinTimeout) {
- this.joinTimeout = joinTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public int getLocalPort() {
- TcpDiscoveryNode locNode0 = locNode;
-
- return locNode0 != null ? locNode0.discoveryPort() : 0;
- }
-
- /**
- * Sets local port to listen to.
- * <p>
- * If not specified, default is {@link #DFLT_PORT}.
- *
- * @param locPort Local port to bind.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setLocalPort(int locPort) {
- this.locPort = locPort;
- }
-
- /** {@inheritDoc} */
- @Override public int getLocalPortRange() {
- return locPortRange;
- }
-
- /**
- * Range for local ports. Local node will try to bind on first available port
- * starting from {@link #getLocalPort()} up until
- * <tt>{@link #getLocalPort()} {@code + locPortRange}</tt>.
- * <p>
- * If not specified, default is {@link #DFLT_PORT_RANGE}.
- *
- * @param locPortRange Local port range to bind.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setLocalPortRange(int locPortRange) {
- this.locPortRange = locPortRange;
- }
-
- /** {@inheritDoc} */
- @Override public int getMaxMissedHeartbeats() {
- return maxMissedHbs;
- }
-
- /**
- * Sets max heartbeats count node can miss without initiating status check.
- * <p>
- * If not provided, default value is {@link #DFLT_MAX_MISSED_HEARTBEATS}.
- *
- * @param maxMissedHbs Max missed heartbeats.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setMaxMissedHeartbeats(int maxMissedHbs) {
- this.maxMissedHbs = maxMissedHbs;
- }
-
- /** {@inheritDoc} */
- @Override public int getMaxMissedClientHeartbeats() {
- return maxMissedClientHbs;
- }
-
- /**
- * Sets max heartbeats count node can miss without failing client node.
- * <p>
- * If not provided, default value is {@link #DFLT_MAX_MISSED_CLIENT_HEARTBEATS}.
- *
- * @param maxMissedClientHbs Max missed client heartbeats.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setMaxMissedClientHeartbeats(int maxMissedClientHbs) {
- this.maxMissedClientHbs = maxMissedClientHbs;
- }
-
- /** {@inheritDoc} */
- @Override public long getStatisticsPrintFrequency() {
- return statsPrintFreq;
- }
-
- /**
- * Sets statistics print frequency.
- * <p>
- * If not set default value is {@link #DFLT_STATS_PRINT_FREQ}.
- * 0 indicates that no print is required. If value is greater than 0 and log is
- * not quiet then statistics are printed out with INFO level.
- * <p>
- * This may be very helpful for tracing topology problems.
- *
- * @param statsPrintFreq Statistics print frequency in milliseconds.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setStatisticsPrintFrequency(long statsPrintFreq) {
- this.statsPrintFreq = statsPrintFreq;
- }
-
- /** {@inheritDoc} */
- @Override public long getIpFinderCleanFrequency() {
- return ipFinderCleanFreq;
- }
-
- /**
- * Sets IP finder clean frequency in milliseconds.
- * <p>
- * If not provided, default value is {@link #DFLT_IP_FINDER_CLEAN_FREQ}
- *
- * @param ipFinderCleanFreq IP finder clean frequency.
- */
- @IgniteSpiConfiguration(optional = true)
- public void setIpFinderCleanFrequency(long ipFinderCleanFreq) {
- this.ipFinderCleanFreq = ipFinderCleanFreq;
- }
-
- /**
- * This method is intended for troubleshooting purposes only.
- *
- * @param debugMode {code True} to start SPI in debug mode.
- */
- public void setDebugMode(boolean debugMode) {
- this.debugMode = debugMode;
- }
-
- /**
- * This method is intended for troubleshooting purposes only.
- *
- * @param debugMsgHist Message history log size.
- */
- public void setDebugMessageHistory(int debugMsgHist) {
- this.debugMsgHist = debugMsgHist;
- }
-
- /** {@inheritDoc} */
- @Override public String getSpiState() {
- synchronized (mux) {
- return spiState.name();
- }
- }
-
- /** {@inheritDoc} */
- @Override public long getSocketTimeout() {
- return sockTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public long getAckTimeout() {
- return ackTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public long getNetworkTimeout() {
- return netTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public int getThreadPriority() {
- return threadPri;
- }
-
- /** {@inheritDoc} */
- @Override public long getHeartbeatFrequency() {
- return hbFreq;
- }
-
- /** {@inheritDoc} */
- @Override public String getIpFinderFormatted() {
- return ipFinder.toString();
- }
-
- /** {@inheritDoc} */
- @Override public int getMessageWorkerQueueSize() {
- return msgWorker.queueSize();
- }
-
- /** {@inheritDoc} */
- @Override public long getNodesJoined() {
- return stats.joinedNodesCount();
- }
-
- /** {@inheritDoc} */
- @Override public long getNodesLeft() {
- return stats.leftNodesCount();
- }
-
- /** {@inheritDoc} */
- @Override public long getNodesFailed() {
- return stats.failedNodesCount();
- }
-
- /** {@inheritDoc} */
- @Override public long getPendingMessagesRegistered() {
- return stats.pendingMessagesRegistered();
- }
-
- /** {@inheritDoc} */
- @Override public long getPendingMessagesDiscarded() {
- return stats.pendingMessagesDiscarded();
- }
-
- /** {@inheritDoc} */
- @Override public long getAvgMessageProcessingTime() {
- return stats.avgMessageProcessingTime();
- }
-
- /** {@inheritDoc} */
- @Override public long getMaxMessageProcessingTime() {
- return stats.maxMessageProcessingTime();
- }
-
- /** {@inheritDoc} */
- @Override public int getTotalReceivedMessages() {
- return stats.totalReceivedMessages();
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, Integer> getReceivedMessages() {
- return stats.receivedMessages();
- }
-
- /** {@inheritDoc} */
- @Override public int getTotalProcessedMessages() {
- return stats.totalProcessedMessages();
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, Integer> getProcessedMessages() {
- return stats.processedMessages();
- }
-
- /** {@inheritDoc} */
- @Override public long getCoordinatorSinceTimestamp() {
- return stats.coordinatorSinceTimestamp();
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public UUID getCoordinator() {
- TcpDiscoveryNode crd = resolveCoordinator();
-
- return crd != null ? crd.id() : null;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public ClusterNode getNode(UUID nodeId) {
- assert nodeId != null;
-
- UUID locNodeId0 = locNodeId;
-
- if (locNodeId0 != null && locNodeId0.equals(nodeId))
- // Return local node directly.
- return locNode;
-
- TcpDiscoveryNode node = ring.node(nodeId);
-
- if (node != null && !node.visible())
- return null;
-
- return node;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<ClusterNode> getRemoteNodes() {
- return F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleRemoteNodes());
- }
-
- /** {@inheritDoc} */
- @Override public Collection<Object> injectables() {
- Collection<Object> res = new LinkedList<>();
-
- if (ipFinder != null)
- res.add(ipFinder);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void spiStart(String gridName) throws IgniteSpiException {
- spiStart0(false);
- }
-
- /**
- * Starts or restarts SPI after stop (to reconnect).
- *
- * @param restart {@code True} if SPI is restarted after stop.
- * @throws org.apache.ignite.spi.IgniteSpiException If failed.
- */
- private void spiStart0(boolean restart) throws IgniteSpiException {
- if (!restart)
- // It is initial start.
- onSpiStart();
-
- synchronized (mux) {
- spiState = DISCONNECTED;
- }
-
- if (debugMode) {
- if (!log.isInfoEnabled())
- throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
- "in debug mode.");
-
- debugLog = new ConcurrentLinkedDeque<>();
-
- U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode.");
- }
-
- // Clear addresses collections.
- fromAddrs.clear();
- noResAddrs.clear();
-
- sockTimeoutWorker = new SocketTimeoutWorker();
- sockTimeoutWorker.start();
-
- msgWorker = new RingMessageWorker();
- msgWorker.start();
-
- tcpSrvr = new TcpServer();
-
- // Init local node.
- IgniteBiTuple<Collection<String>, Collection<String>> addrs;
-
- try {
- addrs = U.resolveLocalAddresses(locHost);
- }
- catch (IOException | GridException e) {
- throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e);
- }
-
- locNode = new TcpDiscoveryNode(
- locNodeId,
- addrs.get1(),
- addrs.get2(),
- tcpSrvr.port,
- metricsProvider,
- locNodeVer);
-
- try {
- Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
- U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())),
- locNode.discoveryPort());
-
- if (extAddrs != null)
- locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
- }
- catch (GridException e) {
- throw new IgniteSpiException("Failed to resolve local host to addresses: " + locHost, e);
- }
-
- locNode.setAttributes(locNodeAttrs);
-
- locNode.local(true);
-
- locNodeAddrs = getNodeAddresses(locNode);
-
- if (log.isDebugEnabled())
- log.debug("Local node initialized: " + locNode);
-
- // Start TCP server thread after local node is initialized.
- tcpSrvr.start();
-
- ring.localNode(locNode);
-
- if (ipFinder.isShared())
- registerLocalNodeAddress();
- else {
- if (F.isEmpty(ipFinder.getRegisteredAddresses()))
- throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " +
- "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " +
- "(specify list of IP addresses in configuration).");
-
- ipFinderHasLocAddr = ipFinderHasLocalAddress();
- }
-
- if (statsPrintFreq > 0 && log.isInfoEnabled()) {
- statsPrinter = new StatisticsPrinter();
- statsPrinter.start();
- }
-
- stats.onJoinStarted();
-
- joinTopology();
-
- stats.onJoinFinished();
-
- hbsSnd = new HeartbeatsSender();
- hbsSnd.start();
-
- chkStatusSnd = new CheckStatusSender();
- chkStatusSnd.start();
-
- if (ipFinder.isShared()) {
- ipFinderCleaner = new IpFinderCleaner();
- ipFinderCleaner.start();
- }
-
- if (log.isDebugEnabled() && !restart)
- log.debug(startInfo());
-
- if (restart)
- getSpiContext().registerPort(tcpSrvr.port, TCP);
- }
-
- /**
- * @throws org.apache.ignite.spi.IgniteSpiException If failed.
- */
- @SuppressWarnings("BusyWait")
- private void registerLocalNodeAddress() throws IgniteSpiException {
- // Make sure address registration succeeded.
- while (true) {
- try {
- ipFinder.initializeLocalAddresses(locNode.socketAddresses());
-
- // Success.
- break;
- }
- catch (IllegalStateException e) {
- throw new IgniteSpiException("Failed to register local node address with IP finder: " +
- locNode.socketAddresses(), e);
- }
- catch (IgniteSpiException e) {
- LT.error(log, e, "Failed to register local node address in IP finder on start " +
- "(retrying every 2000 ms).");
- }
-
- try {
- U.sleep(2000);
- }
- catch (GridInterruptedException e) {
- throw new IgniteSpiException("Thread has been interrupted.", e);
- }
- }
- }
-
- /**
- * @throws org.apache.ignite.spi.IgniteSpiException If failed.
- */
- private void onSpiStart() throws IgniteSpiException {
- startStopwatch();
-
- assertParameter(ipFinder != null, "ipFinder != null");
- assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0");
- assertParameter(locPort > 1023, "localPort > 1023");
- assertParameter(locPortRange >= 0, "localPortRange >= 0");
- assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff");
- assertParameter(netTimeout > 0, "networkTimeout > 0");
- assertParameter(sockTimeout > 0, "sockTimeout > 0");
- assertParameter(ackTimeout > 0, "ackTimeout > 0");
- assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
- assertParameter(reconCnt > 0, "reconnectCnt > 0");
- assertParameter(hbFreq > 0, "heartbeatFreq > 0");
- assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0");
- assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0");
- assertParameter(threadPri > 0, "threadPri > 0");
- assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0");
-
- try {
- locHost = U.resolveLocalHost(locAddr);
- }
- catch (IOException e) {
- throw new IgniteSpiException("Unknown local address: " + locAddr, e);
- }
-
- if (log.isDebugEnabled()) {
- log.debug(configInfo("localHost", locHost.getHostAddress()));
- log.debug(configInfo("localPort", locPort));
- log.debug(configInfo("localPortRange", locPortRange));
- log.debug(configInfo("threadPri", threadPri));
- log.debug(configInfo("networkTimeout", netTimeout));
- log.debug(configInfo("sockTimeout", sockTimeout));
- log.debug(configInfo("ackTimeout", ackTimeout));
- log.debug(configInfo("maxAckTimeout", maxAckTimeout));
- log.debug(configInfo("reconnectCount", reconCnt));
- log.debug(configInfo("ipFinder", ipFinder));
- log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq));
- log.debug(configInfo("heartbeatFreq", hbFreq));
- log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs));
- log.debug(configInfo("statsPrintFreq", statsPrintFreq));
- }
-
- // Warn on odd network timeout.
- if (netTimeout < 3000)
- U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
-
- // Warn on odd heartbeat frequency.
- if (hbFreq < 2000)
- U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq);
-
- registerMBean(gridName, this, TcpDiscoverySpiMBean.class);
-
- if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
- TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder);
-
- if (mcastIpFinder.getLocalAddress() == null)
- mcastIpFinder.setLocalAddress(locAddr);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
- super.onContextInitialized0(spiCtx);
-
- ctxInitLatch.countDown();
-
- spiCtx.registerPort(tcpSrvr.port, TCP);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteSpiContext getSpiContext() {
- if (ctxInitLatch.getCount() > 0) {
- if (log.isDebugEnabled())
- log.debug("Waiting for context initialization.");
-
- try {
- U.await(ctxInitLatch);
-
- if (log.isDebugEnabled())
- log.debug("Context has been initialized.");
- }
- catch (GridInterruptedException e) {
- U.warn(log, "Thread has been interrupted while waiting for SPI context initialization.", e);
- }
- }
-
- return super.getSpiContext();
- }
-
- /** {@inheritDoc} */
- @Override public void spiStop() throws IgniteSpiException {
- spiStop0(false);
- }
-
- /**
- * Stops SPI finally or stops SPI for restart.
- *
- * @param disconnect {@code True} if SPI is being disconnected.
- * @throws org.apache.ignite.spi.IgniteSpiException If failed.
- */
- private void spiStop0(boolean disconnect) throws IgniteSpiException {
- if (ctxInitLatch.getCount() > 0)
- // Safety.
- ctxInitLatch.countDown();
-
- if (log.isDebugEnabled()) {
- if (disconnect)
- log.debug("Disconnecting SPI.");
- else
- log.debug("Preparing to start local node stop procedure.");
- }
-
- if (disconnect) {
- synchronized (mux) {
- spiState = DISCONNECTING;
- }
- }
-
- if (msgWorker != null && msgWorker.isAlive() && !disconnect) {
- // Send node left message only if it is final stop.
- msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNodeId));
-
- synchronized (mux) {
- long threshold = U.currentTimeMillis() + netTimeout;
-
- long timeout = netTimeout;
-
- while (spiState != LEFT && timeout > 0) {
- try {
- mux.wait(timeout);
-
- timeout = threshold - U.currentTimeMillis();
- }
- catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
-
- break;
- }
- }
-
- if (spiState == LEFT) {
- if (log.isDebugEnabled())
- log.debug("Verification for local node leave has been received from coordinator" +
- " (continuing stop procedure).");
- }
- else if (log.isInfoEnabled()) {
- log.info("No verification for local node leave has been received from coordinator" +
- " (will stop node anyway).");
- }
- }
- }
-
- U.interrupt(tcpSrvr);
- U.join(tcpSrvr, log);
-
- Collection<SocketReader> tmp;
-
- synchronized (mux) {
- tmp = U.arrayList(readers);
- }
-
- U.interrupt(tmp);
- U.joinThreads(tmp, log);
-
- U.interrupt(hbsSnd);
- U.join(hbsSnd, log);
-
- U.interrupt(chkStatusSnd);
- U.join(chkStatusSnd, log);
-
- U.interrupt(ipFinderCleaner);
- U.join(ipFinderCleaner, log);
-
- U.interrupt(msgWorker);
- U.join(msgWorker, log);
-
- U.interrupt(sockTimeoutWorker);
- U.join(sockTimeoutWorker, log);
-
- U.interrupt(statsPrinter);
- U.join(statsPrinter, log);
-
- if (ipFinder != null)
- ipFinder.close();
-
- Collection<TcpDiscoveryNode> rmts = null;
-
- if (!disconnect) {
- // This is final stop.
- unregisterMBean();
-
- if (log.isDebugEnabled())
- log.debug(stopInfo());
- }
- else {
- getSpiContext().deregisterPorts();
-
- rmts = ring.visibleRemoteNodes();
- }
-
- long topVer = ring.topologyVersion();
-
- ring.clear();
-
- if (rmts != null && !rmts.isEmpty()) {
- // This is restart/disconnection and remote nodes are not empty.
- // We need to fire FAIL event for each.
- DiscoverySpiListener lsnr = this.lsnr;
-
- if (lsnr != null) {
- Collection<ClusterNode> processed = new LinkedList<>();
-
- for (TcpDiscoveryNode n : rmts) {
- assert n.visible();
-
- processed.add(n);
-
- Collection<ClusterNode> top = F.viewReadOnly(rmts, F.<ClusterNode>identity(), F.notIn(processed));
-
- topVer++;
-
- Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top);
-
- lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist);
- }
- }
- }
-
- printStatistics();
-
- stats.clear();
-
- synchronized (mux) {
- // Clear stored data.
- leavingNodes.clear();
- failedNodes.clear();
-
- spiState = DISCONNECTED;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void onContextDestroyed0() {
- super.onContextDestroyed0();
-
- if (ctxInitLatch.getCount() > 0)
- // Safety.
- ctxInitLatch.countDown();
-
- getSpiContext().deregisterPorts();
- }
-
- /**
- * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs.
- * @return {@code true} if IP finder contains local address.
- */
- private boolean ipFinderHasLocalAddress() throws IgniteSpiException {
- for (InetSocketAddress locAddr : locNodeAddrs) {
- for (InetSocketAddress addr : registeredAddresses())
- try {
- int port = addr.getPort();
-
- InetSocketAddress resolved = addr.isUnresolved() ?
- new InetSocketAddress(InetAddress.getByName(addr.getHostName()), port) :
- new InetSocketAddress(addr.getAddress(), port);
-
- if (resolved.equals(locAddr))
- return true;
- }
- catch (UnknownHostException ignored) {
- // No-op.
- }
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean pingNode(UUID nodeId) {
- assert nodeId != null;
-
- if (nodeId == locNodeId)
- return true;
-
- TcpDiscoveryNode node = ring.node(nodeId);
-
- if (node == null || !node.visible())
- return false;
-
- boolean res = pingNode(node);
-
- if (!res && !node.isClient()) {
- LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId);
-
- msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id()));
- }
-
- return res;
- }
-
- /**
- * Pings the remote node to see if it's alive.
- *
- * @param node Node.
- * @return {@code True} if ping succeeds.
- */
- private boolean pingNode(TcpDiscoveryNode node) {
- assert node != null;
-
- if (node.id().equals(locNodeId))
- return true;
-
- UUID clientNodeId = null;
-
- if (node.isClient()) {
- clientNodeId = node.id();
-
- node = ring.node(node.clientRouterNodeId());
-
- if (node == null || !node.visible())
- return false;
- }
-
- for (InetSocketAddress addr : getNodeAddresses(node, U.sameMacs(locNode, node))) {
- try {
- // ID returned by the node should be the same as ID of the parameter for ping to succeed.
- IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
-
- return node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
- }
- catch (GridException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']');
-
- // continue;
- }
- }
-
- return false;
- }
-
- /**
- * Pings the remote node by its address to see if it's alive.
- *
- * @param addr Address of the node.
- * @return ID of the remote node if node alive.
- * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
- */
- private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId)
- throws GridException {
- assert addr != null;
-
- if (F.contains(locNodeAddrs, addr))
- return F.t(locNodeId, false);
-
- GridFutureAdapterEx<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapterEx<>();
-
- IgniteFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
-
- if (oldFut != null)
- return oldFut.get();
- else {
- Collection<Throwable> errs = null;
-
- try {
- Socket sock = null;
-
- for (int i = 0; i < reconCnt; i++) {
- try {
- if (addr.isUnresolved())
- addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
-
- long tstamp = U.currentTimeMillis();
-
- sock = openSocket(addr);
-
- writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
-
- TcpDiscoveryPingResponse res = readMessage(sock, null, netTimeout);
-
- if (locNodeId.equals(res.creatorNodeId())) {
- if (log.isDebugEnabled())
- log.debug("Ping response from local node: " + res);
-
- break;
- }
-
- stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
-
- IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists());
-
- fut.onDone(t);
-
- return t;
- }
- catch (IOException | GridException e) {
- if (errs == null)
- errs = new ArrayList<>();
-
- errs.add(e);
- }
- finally {
- U.closeQuiet(sock);
- }
- }
- }
- catch (Throwable t) {
- fut.onDone(t);
-
- throw U.cast(t);
- }
- finally {
- if (!fut.isDone())
- fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs));
-
- boolean b = pingMap.remove(addr, fut);
-
- assert b;
- }
-
- return fut.get();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void disconnect() throws IgniteSpiException {
- spiStop0(true);
- }
-
- /** {@inheritDoc} */
- @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator nodeAuth) {
- this.nodeAuth = nodeAuth;
- }
-
- /**
- * Tries to join this node to topology.
- *
- * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs.
- */
- private void joinTopology() throws IgniteSpiException {
- synchronized (mux) {
- assert spiState == CONNECTING || spiState == DISCONNECTED;
-
- spiState = CONNECTING;
- }
-
- GridSecurityCredentials locCred = (GridSecurityCredentials)locNode.getAttributes()
- .get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
- // Marshal credentials for backward compatibility and security.
- marshalCredentials(locNode);
-
- while (true) {
- if (!sendJoinRequestMessage()) {
- if (log.isDebugEnabled())
- log.debug("Join request message has not been sent (local node is the first in the topology).");
-
- // Authenticate local node.
- try {
- GridSecurityContext subj = nodeAuth.authenticateNode(locNode, locCred);
-
- if (subj == null)
- throw new IgniteSpiException("Authentication failed for local node: " + locNode.id());
-
- Map<String, Object> attrs = new HashMap<>(locNode.attributes());
-
- attrs.put(GridNodeAttributes.ATTR_SECURITY_SUBJECT, gridMarsh.marshal(subj));
- attrs.remove(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
- locNode.setAttributes(attrs);
- }
- catch (GridException e) {
- throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e);
- }
-
- locNode.order(1);
- locNode.internalOrder(1);
-
- gridStartTime = U.currentTimeMillis();
-
- locNode.visible(true);
-
- ring.clear();
-
- ring.topologyVersion(1);
-
- synchronized (mux) {
- topHist.clear();
-
- spiState = CONNECTED;
-
- mux.notifyAll();
- }
-
- notifyDiscovery(EVT_NODE_JOINED, 1, locNode);
-
- break;
- }
-
- if (log.isDebugEnabled())
- log.debug("Join request message has been sent (waiting for coordinator response).");
-
- synchronized (mux) {
- long threshold = U.currentTimeMillis() + netTimeout;
-
- long timeout = netTimeout;
-
- while (spiState == CONNECTING && timeout > 0) {
- try {
- mux.wait(timeout);
-
- timeout = threshold - U.currentTimeMillis();
- }
- catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
-
- throw new IgniteSpiException("Thread has been interrupted.");
- }
- }
-
- if (spiState == CONNECTED)
- break;
- else if (spiState == DUPLICATE_ID)
- throw duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get());
- else if (spiState == AUTH_FAILED)
- throw authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get());
- else if (spiState == CHECK_FAILED)
- throw checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get());
- else if (spiState == LOOPBACK_PROBLEM) {
- TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get();
-
- boolean locHostLoopback = locHost.isLoopbackAddress();
-
- String firstNode = locHostLoopback ? "local" : "remote";
-
- String secondNode = locHostLoopback ? "remote" : "local";
-
- throw new IgniteSpiException("Failed to add node to topology because " + firstNode +
- " node is configured to use loopback address, but " + secondNode + " node is not " +
- "(consider changing 'localAddress' configuration parameter) " +
- "[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" +
- U.addressesAsString(msg.addresses(), msg.hostNames()) + ']');
- }
- else
- LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " +
- "Check remote nodes logs for possible error messages. " +
- "Note that large topology may require significant time to start. " +
- "Increase 'GridTcpDiscoverySpi.networkTimeout' configuration property " +
- "if getting this message on the starting nodes [networkTimeout=" + netTimeout + ']');
- }
- }
-
- assert locNode.order() != 0;
- assert locNode.internalOrder() != 0;
-
- if (log.isDebugEnabled())
- log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder());
- }
-
- /**
- * @param msg Error message.
- * @return Remote grid version parsed from error message.
- * @deprecated This method was created for preserving backward compatibility. During major version update
- * parsing of error message should be replaced with new {@link org.gridgain.grid.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage}
- * which contains all necessary information.
- */
- @Deprecated
- @Nullable private String parseRemoteVersion(String msg) {
- msg = msg.replaceAll("\\s", "");
-
- final String verPrefix = "rmtBuildVer=";
-
- int startIdx = msg.indexOf(verPrefix);
- int endIdx = msg.indexOf(',', startIdx);
-
- if (endIdx < 0)
- endIdx = msg.indexOf(']', startIdx);
-
- if (startIdx < 0 || endIdx < 0)
- return null;
-
- return msg.substring(startIdx + verPrefix.length() - 1, endIdx);
- }
-
- /**
- * Tries to send join request message to a random node presenting in topology.
- * Address is provided by {@link org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is
- * sent to first node connection succeeded to.
- *
- * @return {@code true} if send succeeded.
- * @throws org.apache.ignite.spi.IgniteSpiException If any error occurs.
- */
- @SuppressWarnings({"BusyWait"})
- private boolean sendJoinRequestMessage() throws IgniteSpiException {
- TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
- exchange.collect(locNodeId));
-
- // Time when it has been detected, that addresses from IP finder do not respond.
- long noResStart = 0;
-
- while (true) {
- Collection<InetSocketAddress> addrs = resolvedAddresses();
-
- if (F.isEmpty(addrs))
- return false;
-
- boolean retry = false;
- GridException errs = null;
-
- for (InetSocketAddress addr : addrs) {
- try {
- Integer res = sendMessageDirectly(joinReq, addr);
-
- assert res != null;
-
- noResAddrs.remove(addr);
-
- // Address is responsive, reset period start.
- noResStart = 0;
-
- switch (res) {
- case RES_WAIT:
- // Concurrent startup, try sending join request again or wait if no success.
- retry = true;
-
- break;
- case RES_OK:
- if (log.isDebugEnabled())
- log.debug("Join request message has been sent to address [addr=" + addr +
- ", req=" + joinReq + ']');
-
- // Join request sending succeeded, wait for response from topology.
- return true;
-
- default:
- // Concurrent startup, try next node.
- if (res == RES_CONTINUE_JOIN) {
- if (!fromAddrs.contains(addr))
- retry = true;
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Unexpected response to join request: " + res);
-
- retry = true;
- }
-
- break;
- }
- }
- catch (IgniteSpiException e) {
- if (errs == null)
- errs = new GridException("Multiple connection attempts failed.");
-
- errs.addSuppressed(e);
-
- if (log.isDebugEnabled()) {
- IOException ioe = X.cause(e, IOException.class);
-
- log.debug("Failed to send join request message [addr=" + addr +
- ", msg=" + ioe != null ? ioe.getMessage() : e.getMessage() + ']');
- }
-
- noResAddrs.add(addr);
- }
- }
-
- if (retry) {
- if (log.isDebugEnabled())
- log.debug("Concurrent discovery SPI start has been detected (local node should wait).");
-
- try {
- U.sleep(2000);
- }
- catch (GridInterruptedException e) {
- throw new IgniteSpiException("Thread has been interrupted.", e);
- }
- }
- else if (!ipFinder.isShared() && !ipFinderHasLocAddr) {
- if (errs != null && X.hasCause(errs, ConnectException.class))
- LT.warn(log, null, "Failed to connect to any address from IP finder " +
- "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
- addrs);
-
- if (joinTimeout > 0) {
- if (noResStart == 0)
- noResStart = U.currentTimeMillis();
- else if (U.currentTimeMillis() - noResStart > joinTimeout)
- throw new IgniteSpiException(
- "Failed to connect to any address from IP finder within join timeout " +
- "(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
- "on all host machines, or consider increasing 'joinTimeout' configuration property): " +
- addrs, errs);
- }
-
- try {
- U.sleep(2000);
- }
- catch (GridInterruptedException e) {
- throw new IgniteSpiException("Thread has been interrupted.", e);
- }
- }
- else
- break;
- }
-
- return false;
- }
-
- /**
- * Establishes connection to an address, sends message and returns the response (if any).
- *
- * @param msg Message to send.
- * @param addr Address to send message to.
- * @return Response read from the recipient or {@code null} if no response is supposed.
- * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
- */
- @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr)
- throws IgniteSpiException {
- assert msg != null;
- assert addr != null;
-
- Collection<Throwable> errs = null;
-
- Socket sock = null;
-
- long ackTimeout0 = ackTimeout;
-
- int connectAttempts = 1;
-
- boolean joinReqSent = false;
-
- for (int i = 0; i < reconCnt; i++) {
- // Need to set to false on each new iteration,
- // since remote node may leave in the middle of the first iteration.
- joinReqSent = false;
-
- boolean openSock = false;
-
- try {
- long tstamp = U.currentTimeMillis();
-
- sock = openSocket(addr);
-
- openSock = true;
-
- // Handshake.
- writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
-
- TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout0);
-
- if (locNodeId.equals(res.creatorNodeId())) {
- if (log.isDebugEnabled())
- log.debug("Handshake response from local node: " + res);
-
- break;
- }
-
- stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
-
- // Send message.
- tstamp = U.currentTimeMillis();
-
- writeToSocket(sock, msg);
-
- stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
-
- if (debugMode)
- debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
- ", rmtNodeId=" + res.creatorNodeId() + ']');
-
- if (log.isDebugEnabled())
- log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
- ", rmtNodeId=" + res.creatorNodeId() + ']');
-
- // Connection has been established, but
- // join request may not be unmarshalled on remote host.
- // E.g. due to class not found issue.
- joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
-
- return readReceipt(sock, ackTimeout0);
- }
- catch (ClassCastException e) {
- // This issue is rarely reproducible on AmazonEC2, but never
- // on dedicated machines.
- if (log.isDebugEnabled())
- U.error(log, "Class cast exception on direct send: " + addr, e);
-
- if (errs == null)
- errs = new ArrayList<>();
-
- errs.add(e);
- }
- catch (IOException | GridException e) {
- if (log.isDebugEnabled())
- log.error("Exception on direct send: " + e.getMessage(), e);
-
- if (errs == null)
- errs = new ArrayList<>();
-
- errs.add(e);
-
- if (!openSock) {
- // Reconnect for the second time, if connection is not established.
- if (connectAttempts < 2) {
- connectAttempts++;
-
- continue;
- }
-
- break; // Don't retry if we can not establish connection.
- }
-
- if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
- ackTimeout0 *= 2;
-
- if (!checkAckTimeout(ackTimeout0))
- break;
- }
- }
- finally {
- U.closeQuiet(sock);
- }
- }
-
- if (joinReqSent) {
- if (log.isDebugEnabled())
- log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT).");
-
- // Topology will not include this node,
- // however, warning on timed out join will be output.
- return RES_OK;
- }
-
- throw new IgniteSpiException(
- "Failed to send message to address [addr=" + addr + ", msg=" + msg + ']',
- U.exceptionWithSuppressed("Failed to send message to address " +
- "[addr=" + addr + ", msg=" + msg + ']', errs));
- }
-
- /**
- * Marshalls credentials with discovery SPI marshaller (will replace attribute value).
- *
- * @param node Node to marshall credentials for.
- * @throws org.apache.ignite.spi.IgniteSpiException If marshalling failed.
- */
- private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
- try {
- // Use security-unsafe getter.
- Map<String, Object> attrs = new HashMap<>(node.getAttributes());
-
- attrs.put(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS,
- marsh.marshal(attrs.get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
-
- node.setAttributes(attrs);
- }
- catch (GridException e) {
- throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e);
- }
- }
-
- /**
- * Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value).
- *
- * @param node Node to unmarshall credentials for.
- * @return Security credentials.
- * @throws org.apache.ignite.spi.IgniteSpiException If unmarshal fails.
- */
- private GridSecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
- try {
- byte[] credBytes = (byte[])node.getAttributes().get(GridNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
- if (credBytes == null)
- return null;
-
- return marsh.unmarshal(credBytes, null);
- }
- catch (GridException e) {
- throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
- }
- }
-
- /**
- * @param ackTimeout Acknowledgement timeout.
- * @return {@code True} if acknowledgement timeout is less or equal to
- * maximum acknowledgement timeout, {@code false} otherwise.
- */
- private boolean checkAckTimeout(long ackTimeout) {
- if (ackTimeout > maxAckTimeout) {
- LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
- "(consider increasing 'maxAckTimeout' configuration property) " +
- "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + maxAckTimeout + ']');
-
- return false;
- }
-
- return true;
- }
-
- /**
- * Notify external listener on discovery event.
- *
- * @param type Discovery event type. See {@link org.apache.ignite.events.IgniteDiscoveryEvent} for more details.
- * @param topVer Topology version.
- * @param node Remote node this event is connected with.
- */
- private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) {
- assert type > 0;
- assert node != null;
-
- DiscoverySpiListener lsnr = this.lsnr;
-
- TcpDiscoverySpiState spiState = spiStateCopy();
-
- if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) {
- if (log.isDebugEnabled())
- log.debug("Discovery notification [node=" + node + ", spiState=" + spiState +
- ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
-
- Collection<ClusterNode> top = F.<TcpDiscoveryNode, ClusterNode>upcast(ring.visibleNodes());
-
- Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top);
-
- lsnr.onDiscovery(type, topVer, node, top, hist);
- }
- else if (log.isDebugEnabled())
- log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState +
- ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
- }
-
- /**
- * Update topology history with new topology snapshots.
- *
- * @param topVer Topology version.
- * @param top Topology snapshot.
- * @return Copy of updated topology history.
- */
- @Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
- synchronized (mux) {
- if (topHist.containsKey(topVer))
- return null;
-
- topHist.put(topVer, top);
-
- while (topHist.size() > topHistSize)
- topHist.remove(topHist.firstKey());
-
- if (log.isDebugEnabled())
- log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size());
-
- return new TreeMap<>(topHist);
- }
- }
-
- /**
- * @param node Node.
- * @return {@link LinkedHashSet} of internal and external addresses of provided node.
- * Internal addresses placed before external addresses.
- */
- @SuppressWarnings("TypeMayBeWeakened")
- private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) {
- LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses());
-
- Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
-
- if (extAddrs != null)
- res.addAll(extAddrs);
-
- return res;
- }
-
- /**
- * @param node Node.
- * @param sameHost Same host flag.
- * @return {@link LinkedHashSet} of internal and external addresses of provided node.
- * Internal addresses placed before external addresses.
- * Internal addresses will be sorted with {@code inetAddressesComparator(sameHost)}.
- */
- @SuppressWarnings("TypeMayBeWeakened")
- private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node, boolean sameHost) {
- List<InetSocketAddress> addrs = U.arrayList(node.socketAddresses());
-
- Collections.sort(addrs, U.inetAddressesComparator(sameHost));
-
- LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(addrs);
-
- Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
-
- if (extAddrs != null)
- res.addAll(extAddrs);
-
- return res;
- }
-
- /**
- * Checks whether local node is coordinator. Nodes that are leaving or failed
- * (but are still in topology) are removed from search.
- *
- * @return {@code true} if local node is coordinator.
- */
- private boolean isLocalNodeCoordinator() {
- synchronized (mux) {
- boolean crd = spiState == CONNECTED && locNode.equals(resolveCoordinator());
-
- if (crd)
- stats.onBecomingCoordinator();
-
- return crd;
- }
- }
-
- /**
- * @return Spi state copy.
- */
- private TcpDiscoverySpiState spiStateCopy() {
- TcpDiscoverySpiState state;
-
- synchronized (mux) {
- state = spiState;
- }
-
- return state;
- }
-
- /**
- * Resolves coordinator. Nodes that are leaving or failed (but are still in
- * topology) are removed from search.
- *
- * @return Coordinator node or {@code null} if there are no coordinator
- * (i.e. local node is the last one and is currently stopping).
- */
- @Nullable private TcpDiscoveryNode resolveCoordinator() {
- return resolveCoordinator(null);
- }
-
- /**
- * Resolves coordinator. Nodes that are leaving or failed (but are still in
- * topology) are removed from search as well as provided filter.
- *
- * @param filter Nodes to exclude when resolving coordinator (optional).
- * @return Coordinator node or {@code null} if there are no coordinator
- * (i.e. local node is the last one and is currently stopping).
- */
- @Nullable private TcpDiscoveryNode resolveCoordinator(
- @Nullable Collection<TcpDiscoveryNode> filter) {
- synchronized (mux) {
- Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes);
-
- if (!F.isEmpty(filter))
- excluded = F.concat(false, excluded, filter);
-
- return ring.coordinator(excluded);
- }
- }
-
- /**
- * Prints SPI statistics.
- */
- private void printStatistics() {
- if (log.isInfoEnabled() && statsPrintFreq > 0) {
- int failedNodesSize;
- int leavingNodesSize;
-
- synchronized (mux) {
- failedNodesSize = failedNodes.size();
- leavingNodesSize = leavingNodes.size();
- }
-
- Runtime runtime = Runtime.getRuntime();
-
- TcpDiscoveryNode coord = resolveCoordinator();
-
- log.info("Discovery SPI statistics [statistics=" + stats + ", spiState=" + spiStateCopy() +
- ", coord=" + coord +
- ", topSize=" + ring.allNodes().size() +
- ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize +
- ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") +
- ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) : "N/A") +
- ", heapFree=" + runtime.freeMemory() / (1024 * 1024) +
- "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]");
- }
- }
-
- /**
- * @param msg Message to prepare.
- * @param destNodeId Destination node ID.
- * @param msgs Messages to include.
- * @param discardMsgId Discarded message ID.
- */
- private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId,
- @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
- assert destNodeId != null;
-
- if (msg instanceof TcpDiscoveryNodeAddedMessage) {
- TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
-
- TcpDiscoveryNode node = nodeAddedMsg.node();
-
- if (node.id().equals(destNodeId)) {
- Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
- Collection<TcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size());
-
- for (TcpDiscoveryNode n0 : allNodes) {
- assert n0.internalOrder() != 0 : n0;
-
- // Skip next node and nodes added after next
- // in case this message is resent due to failures/leaves.
- // There will be separate messages for nodes with greater
- // internal order.
- if (n0.internalOrder() < nodeAddedMsg.node().internalOrder())
- topToSend.add(n0);
- }
-
- nodeAddedMsg.topology(topToSend);
- nodeAddedMsg.messages(msgs, discardMsgId);
-
- Map<Long, Collection<ClusterNode>> hist;
-
- synchronized (mux) {
- hist = new TreeMap<>(topHist);
- }
-
- nodeAddedMsg.topologyHistory(hist);
- }
- }
- }
-
- /**
- * @param msg Message to clear.
- */
- private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) {
- if (msg instanceof TcpDiscoveryNodeAddedMessage) {
- // Nullify topology before registration.
- TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
-
- nodeAddedMsg.topology(null);
- nodeAddedMsg.topologyHistory(null);
- nodeAddedMsg.messages(null, null);
- }
- }
-
- /**
- * <strong>FOR TEST ONLY!!!</strong>
- * <p>
- * Simulates this node failure by stopping service threads. So, node will become
- * unresponsive.
- * <p>
- * This method is intended for test purposes only.
- */
- void simulateNodeFailure() {
- U.warn(log, "Simulating node failure: " + locNodeId);
-
- U.interrupt(tcpSrvr);
- U.join(tcpSrvr, log);
-
- U.interrupt(hbsSnd);
- U.join(hbsSnd, log);
-
- U.interrupt(chkStatusSnd);
- U.join(chkStatusSnd, log);
-
- U.interrupt(ipFinderCleaner);
- U.join(ipFinderCleaner, log);
-
- Collection<SocketReader> tmp;
-
- synchronized (mux) {
- tmp = U.arrayList(readers);
- }
-
- U.interrupt(tmp);
- U.joinThreads(tmp, log);
-
- U.interrupt(msgWorker);
- U.join(msgWorker, log);
-
- U.interrupt(statsPrinter);
- U.join(statsPrinter, log);
- }
-
- /**
- * <strong>FOR TEST ONLY!!!</strong>
- * <p>
- * Simulates situation when next node is still alive but is bypassed
- * since it has been excluded from the ring, possibly, due to short time
- * network problems.
- * <p>
- * This method is intended for test purposes only.
- */
- void forceNextNodeFailure() {
- U.warn(log, "Next node will be forcibly failed (if any).");
-
- TcpDiscoveryNode next;
-
- synchronized (mux) {
- next = ring.nextNode(failedNodes);
- }
-
- if (next != null)
- msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, next.id(), next.internalOrder()));
- }
-
- /**
- * <strong>FOR TEST ONLY!!!</strong>
- * <p>
- * This method is intended for test purposes only.
- *
- * @param msg Message.
- */
- void onBeforeMessageSentAcrossRing(Serializable msg) {
- // No-op.
- }
-
- /**
- * <strong>FOR TEST ONLY!!!</strong>
- * <p>
- * This method is intended for test purposes only.
- *
- * @return Nodes ring.
- */
- TcpDiscoveryNodesRing ring() {
- return ring;
- }
-
- /** {@inheritDoc} */
- @Override public void dumpDebugInfo() {
- dumpDebugInfo(log);
- }
-
- /**
- * @param log Logger.
- */
- public void dumpDebugInfo(IgniteLogger log) {
- if (!debugMode) {
- U.quietAndWarn(log, "Failed to dump debug info (discovery SPI was not configured " +
- "in debug mode, consider setting 'debugMode' configuration property to 'true').");
-
- return;
- }
-
- assert log.isInfoEnabled();
-
- synchronized (mux) {
- StringBuilder b = new StringBuilder(U.nl());
-
- b.append(">>>").append(U.nl());
- b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl());
- b.append(">>>").append(U.nl());
-
- b.append("Local node ID: ").append(locNodeId).append(U.nl()).append(U.nl());
- b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl());
- b.append("SPI state: ").append(spiState).append(U.nl()).append(U.nl());
-
- b.append("Internal threads: ").append(U.nl());
-
- b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
- b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
- b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
- b.append(" Socket timeout worker: ").append(threadStatus(sockTimeoutWorker)).append(U.nl());
- b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
- b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
-
- b.append(U.nl());
-
- b.append("Socket readers: ").append(U.nl());
-
- for (SocketReader rdr : readers)
- b.append(" ").append(rdr).append(U.nl());
-
- b.append(U.nl());
-
- b.append("In-memory log messages: ").append(U.nl());
-
- for (String msg : debugLog)
- b.append(" ").append(msg).append(U.nl());
-
- b.append(U.nl());
-
- b.append("Leaving nodes: ").append(U.nl());
-
- for (TcpDiscoveryNode node : leavingNodes)
- b.append(" ").append(node.id()).append(U.nl());
-
- b.append(U.nl());
-
- b.append("Failed nodes: ").append(U.nl());
-
- for (TcpDiscoveryNode node : failedNodes)
- b.append(" ").append(node.id()).append(U.nl());
-
- b.append(U.nl());
-
- b.append("Stats: ").append(stats).append(U.nl());
-
- U.quietAndInfo(log, b.toString());
- }
- }
-
- /**
- * @param msg Message.
- */
- private void debugLog(String msg) {
- assert debugMode;
-
- String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
- '[' + Thread.currentThread().getName() + "][" + locNodeId + "-" + locNode.internalOrder() + "] " +
- msg;
-
- debugLog.add(msg0);
-
- int delta = debugLog.size() - debugMsgHist;
-
- for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
- debugLog.poll();
- }
-
- /**
- * @param msg Message.
- * @return {@code True} if recordable in debug mode.
- */
- private boolean recordable(TcpDiscoveryAbstractMessage msg) {
- return !(msg instanceof TcpDiscoveryHeartbeatMessage) &&
- !(msg instanceof TcpDiscoveryStatusCheckMessage) &&
- !(msg instanceof TcpDiscoveryDiscardMessage);
- }
-
- /**
- * @param t Thread.
- * @return Status as string.
- */
- private String threadStatus(Thread t) {
- if (t == null)
- return "N/A";
-
- return t.isAlive() ? "alive" : "dead";
- }
-
- /**
- * Checks if two given {@link GridSecurityPermissionSet} objects contain the same permissions.
- * Each permission belongs to one of three groups : cache, task or system.
- *
- * @param locPerms The first set of permissions.
- * @param rmtPerms The second set of permissions.
- * @return {@code True} if given parameters contain the same permissions, {@code False} otherwise.
- */
- private boolean permissionsEqual(GridSecurityPermissionSet locPerms, GridSecurityPermissionSet rmtPerms) {
- boolean dfltAllowMatch = !(locPerms.defaultAllowAll() ^ rmtPerms.defaultAllowAll());
-
- boolean bothHaveSamePerms = F.eqNotOrdered(rmtPerms.systemPermissions(), locPerms.systemPermissions()) &&
- F.eqNotOrdered(rmtPerms.cachePermissions(), locPerms.cachePermissions()) &&
- F.eqNotOrdered(rmtPerms.taskPermissions(), locPerms.taskPermissions());
-
- return dfltAllowMatch && bothHaveSamePerms;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TcpDiscoverySpi.class, this);
- }
-
- /**
- * Thread that sends heartbeats.
- */
- private class HeartbeatsSender extends IgniteSpiThread {
- /**
- * Constructor.
- */
- private HeartbeatsSender() {
- super(gridName, "tcp-disco-hb-sender", log);
-
- setPriority(threadPri);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("BusyWait")
- @Override protected void body() throws InterruptedException {
- while (!isLocalNodeCoordinator())
- Thread.sleep(1000);
-
- if (log.isDebugEnabled())
- log.debug("Heartbeats sender has been started.");
-
- while (!isInterrupted()) {
- if (spiStateCopy() != CONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Stopping heartbeats sender (SPI is not connected to topology).");
-
- return;
- }
-
- TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(locNodeId);
-
- msg.verify(locNodeId);
-
- msgWorker.addMessage(msg);
-
- Thread.sleep(hbFreq);
- }
- }
- }
-
- /**
- * Thread that sends status check messages to next node if local node has not
- * been receiving heartbeats ({@link org.gridgain.grid.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage})
- * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
- * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
- */
- private class CheckStatusSender extends IgniteSpiThread {
- /**
- * Constructor.
- */
- private CheckStatusSender() {
- super(gridName, "tcp-disco-status-check-sender", log);
-
- setPriority(threadPri);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("BusyWait")
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Status check sender has been started.");
-
- // Only 1 heartbeat missing is acceptable. 1 sec is added to avoid false alarm.
- long checkTimeout = (long)maxMissedHbs * hbFreq + 1000;
-
- long lastSent = 0;
-
- while (!isInterrupted()) {
- // 1. Determine timeout.
- if (lastSent < locNode.lastUpdateTime())
- lastSent = locNode.lastUpdateTime();
-
- long timeout = (lastSent + checkTimeout) - U.currentTimeMillis();
-
- if (timeout > 0)
- Thread.sleep(timeout);
-
- // 2. Check if SPI is still connected.
- if (spiStateCopy() != CONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Stopping status check sender (SPI is not connected to topology).");
-
- return;
- }
-
- // 3. Was there an update?
- if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) {
- if (log.isDebugEnabled())
- log.debug("Skipping status check send " +
- "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) +
- ", hasRmts=" + ring.hasRemoteNodes() + ']');
-
- continue;
- }
-
- // 4. Send status check message.
- lastSent = U.currentTimeMillis();
-
- msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
- }
- }
- }
-
- /**
- * Thread that cleans IP finder and keeps it in the correct state, unregistering
- * addresses of the nodes that has left the topology.
- * <p>
- * This thread should run only on coordinator node and will clean IP finder
- * if and only if {@link org.gridgain.grid.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()} is {@code true}.
- */
- private class IpFinderCleaner extends IgniteSpiThread {
- /**
- * Constructor.
- */
- private IpFinderCleaner() {
- super(gridName, "tcp-disco-ip-finder-cleaner", log);
-
- setPriority(threadPri);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("BusyWait")
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("IP finder cleaner has been started.");
-
- while (!isInterrupted()) {
- Thread.sleep(ipFinderCleanFreq);
-
- if (!isLocalNodeCoordinator())
- continue;
-
- if (spiStateCopy() != CONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Stopping IP finder cleaner (SPI is not connected to topology).");
-
- return;
- }
-
- if (ipFinder.isShared())
- cleanIpFinder();
- }
- }
-
- /**
- * Cleans IP finder.
- */
- private void cleanIpFinder() {
- assert ipFinder.isShared();
-
- try {
- // Addresses that belongs to nodes in topology.
- Collection<InetSocketAddress> currAddrs = F.flatCollections(
- F.viewReadOnly(
- ring.allNodes(),
- new C1<TcpDiscoveryNode, Collection<InetSocketAddress>>() {
- @Override public Collection<InetSocketAddress> apply(TcpDiscoveryNode node) {
- return !node.isClient() ? getNodeAddresses(node) :
- Collections.<InetSocketAddress>emptyList();
- }
- }
- )
- );
-
- // Addresses registered in IP finder.
- Collection<InetSocketAddress> regAddrs = registeredAddresses();
-
- // Remove all addresses that belong to alive nodes, leave dead-node addresses.
- Collection<InetSocketAddress> rmvAddrs = F.view(
- regAddrs,
- F.notContains(currAddrs),
- new P1<InetSocketAddress>() {
- private final Map<InetSocketAddress, Boolean> pingResMap =
- new HashMap<>();
-
- @Override public boolean apply(InetSocketAddress addr) {
- Boolean res = pingResMap.get(addr);
-
- if (res == null) {
- try {
- res = pingNode(addr, null).get1() != null;
- }
- catch (GridException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to ping node [addr=" + addr +
- ", err=" + e.getMessage() + ']');
-
- res = false;
- }
- finally {
- pingResMap.put(addr, res);
- }
- }
-
- return !res;
- }
- }
- );
-
- // Unregister dead-nodes addresses.
- if (!rmvAddrs.isEmpty()) {
- ipFinder.unregisterAddresses(rmvAddrs);
-
- if (log.isDebugEnabled())
- log.debug("Unregistered addresses from IP finder: " + rmvAddrs);
- }
-
- // Addresses that were removed by mistake (e.g. on segmentation).
- Collection<InetSocketAddress> missingAddrs = F.view(
- currAddrs,
- F.notContains(regAddrs)
- );
-
- // Re-register missing addresses.
- if (!missingAddrs.isEmpty()) {
- ipFinder.registerAddresses(missingAddrs);
-
- if (log.isDebugEnabled())
- log.debug("Registered missing addresses in IP finder: " + missingAddrs);
- }
- }
- catch (IgniteSpiException e) {
- LT.error(log, e, "Failed to clean IP finder up.");
- }
- }
- }
-
- /**
- * Pending messages container.
- */
- private static class PendingMessages {
- /** */
- private static final int MAX = 1024;
-
- /** Pending messages. */
- private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
-
- /** Discarded message ID. */
- private IgniteUuid discardId;
-
- /**
- * Adds pending message and shrinks queue if it exceeds limit
- * (messages that were not discarded yet are never removed).
- *
- * @param msg Message to add.
- */
- void add(TcpDiscoveryAbstractMessage msg) {
- msgs.add(msg);
-
- while (msgs.size() > MAX) {
- TcpDiscoveryAbstractMessage polled = msgs.poll();
-
- assert polled != null;
-
- if (polled.id().equals(discardId))
- break;
- }
- }
-
- /**
- * Gets messages starting from provided ID (exclusive). If such
- * message is not found, {@code null} is returned (this indicates
- * a failure condition when it was already removed from queue).
- *
- * @param lastMsgId Last message ID.
- * @return Collection of messages.
- */
- @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid lastMsgId) {
- assert lastMsgId != null;
-
- Collection<TcpDiscoveryAbstractMessage> copy = new ArrayList<>(msgs.size());
-
- boolean skip = true;
-
- for (TcpDiscoveryAbstractMessage msg : msgs) {
- if (skip) {
- if (msg.id().equals(lastMsgId))
- skip = false;
- }
- else
- copy.add(msg);
- }
-
- return !skip ? copy : null;
- }
-
- /**
- * Resets pending messages.
- *
- * @param msgs Message.
- * @param discardId Discarded message ID.
- */
- void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
- this.msgs.clear();
-
- if (msgs != null)
- this.msgs.addAll(msgs);
-
- this.discardId = discardId;
- }
-
- /**
- * Clears pending messages.
- */
- void clear() {
- msgs.clear();
-
- discardId = null;
- }
-
- /**
- * Discards message with provided ID and all before it.
- *
- * @param id Discarded message ID.
- */
- void discard(IgniteUuid id) {
- discardId = id;
- }
- }
-
- /**
- * Message worker thread for mess
<TRUNCATED>