You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/01 13:44:54 UTC
[02/53] [abbrv] incubator-ignite git commit: # IGNITE-943 Merge
TcpDiscoverySpi and TcpClientDiscoverySpi
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 8ceac1c..2b2c691 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -18,22 +18,17 @@
package org.apache.ignite.spi.discovery.tcp;
import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.events.*;
-import org.apache.ignite.internal.processors.security.*;
import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.security.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.jdk.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.discovery.*;
@@ -45,22 +40,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.spi.discovery.tcp.messages.*;
import org.jetbrains.annotations.*;
-import org.jsr166.*;
import java.io.*;
import java.net.*;
-import java.text.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.IgniteNodeAttributes.*;
-import static org.apache.ignite.spi.IgnitePortProtocol.*;
-import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
-import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage.*;
-import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*;
-
/**
* Discovery SPI implementation that uses TCP/IP for node discovery.
* <p>
@@ -150,10 +136,43 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
@IgniteSpiMultipleInstancesSupport(true)
@DiscoverySpiOrderSupport(true)
@DiscoverySpiHistorySupport(true)
-public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscoverySpiMBean {
+public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
+ /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
+ public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
+
/** Default local port range (value is <tt>100</tt>). */
public static final int DFLT_PORT_RANGE = 100;
+ /** Default port to listen (value is <tt>47500</tt>). */
+ public static final int DFLT_PORT = 47500;
+
+ /** Default timeout for joining topology (value is <tt>0</tt>). */
+ public static final long DFLT_JOIN_TIMEOUT = 0;
+
+ /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */
+ public static final long DFLT_NETWORK_TIMEOUT = 5000;
+
+ /** Default value for thread priority (value is <tt>10</tt>). */
+ public static final int DFLT_THREAD_PRI = 10;
+
+ /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */
+ public static final long DFLT_HEARTBEAT_FREQ = 100;
+
+ /** Default size of topology snapshots history. */
+ public static final int DFLT_TOP_HISTORY_SIZE = 1000;
+
+ /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
+ public static final long DFLT_SOCK_TIMEOUT = 200;
+
+ /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */
+ public static final long DFLT_ACK_TIMEOUT = 50;
+
+ /** Default socket operations timeout in milliseconds (value is <tt>700ms</tt>). */
+ public static final long DFLT_SOCK_TIMEOUT_CLIENT = 700;
+
+ /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>700ms</tt>). */
+ public static final long DFLT_ACK_TIMEOUT_CLIENT = 700;
+
/** Default reconnect attempts count (value is <tt>10</tt>). */
public static final int DFLT_RECONNECT_CNT = 10;
@@ -172,154 +191,236 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
/** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */
public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
- /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
- public static final long DFLT_SOCK_TIMEOUT = 200;
-
- /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */
- public static final long DFLT_ACK_TIMEOUT = 50;
-
- /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
- public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
+ /** Local address. */
+ protected String locAddr;
/** Address resolver. */
private AddressResolver addrRslvr;
- /** Local port which node uses. */
- private int locPort = DFLT_PORT;
+ /** IP finder. */
+ protected TcpDiscoveryIpFinder ipFinder;
- /** Local port range. */
- private int locPortRange = DFLT_PORT_RANGE;
+ /** Socket operations timeout. */
+ protected long sockTimeout; // Must be initialized in the constructor of child class.
- /** Statistics print frequency. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"})
- private long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
+ /** Message acknowledgement timeout. */
+ protected long ackTimeout; // Must be initialized in the constructor of child class.
- /** Maximum message acknowledgement timeout. */
- private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
+ /** Network timeout. */
+ protected long netTimeout = DFLT_NETWORK_TIMEOUT;
- /** Max heartbeats count node can miss without initiating status check. */
- private int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
+ /** Join timeout. */
+ @SuppressWarnings("RedundantFieldInitialization")
+ protected long joinTimeout = DFLT_JOIN_TIMEOUT;
- /** Max heartbeats count node can miss without failing client node. */
- private int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
+ /** Thread priority for all threads started by SPI. */
+ protected int threadPri = DFLT_THREAD_PRI;
- /** IP finder clean frequency. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ;
+ /** Heartbeat messages issuing frequency. */
+ protected long hbFreq = DFLT_HEARTBEAT_FREQ;
- /** Reconnect attempts count. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private int reconCnt = DFLT_RECONNECT_CNT;
+ /** Size of topology snapshots history. */
+ protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
- /** */
- private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
+ /** Grid discovery listener. */
+ protected volatile DiscoverySpiListener lsnr;
- /** Nodes ring. */
- @GridToStringExclude
- private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
+ /** Data exchange. */
+ protected DiscoverySpiDataExchange exchange;
+
+ /** Metrics provider. */
+ protected DiscoveryMetricsProvider metricsProvider;
- /** Topology snapshots history. */
- private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+ /** Local node attributes. */
+ protected Map<String, Object> locNodeAttrs;
- /** Socket readers. */
- private final Collection<SocketReader> readers = new LinkedList<>();
+ /** Local node version. */
+ protected IgniteProductVersion locNodeVer;
- /** TCP server for discovery SPI. */
- private TcpServer tcpSrvr;
+ /** Local node. */
+ protected TcpDiscoveryNode locNode;
- /** Message worker. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private RingMessageWorker msgWorker;
+ /** Local host. */
+ protected InetAddress locHost;
- /** Client message workers. */
- private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
+ /** Internal and external addresses of local node. */
+ protected Collection<InetSocketAddress> locNodeAddrs;
- /** Metrics sender. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private HeartbeatsSender hbsSnd;
+ /** Socket timeout worker. */
+ protected SocketTimeoutWorker sockTimeoutWorker;
- /** Status checker. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private CheckStatusSender chkStatusSnd;
+ /** Start time of the very first grid node. */
+ protected volatile long gridStartTime;
- /** IP finder cleaner. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private IpFinderCleaner ipFinderCleaner;
+ /** Marshaller. */
+ protected final Marshaller marsh = new JdkMarshaller();
- /** Statistics printer thread. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private StatisticsPrinter statsPrinter;
+ /** Statistics. */
+ protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics();
- /** Failed nodes (but still in topology). */
- private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
+ /** Local port which node uses. */
+ protected int locPort = DFLT_PORT;
+
+ /** Local port range. */
+ protected int locPortRange = DFLT_PORT_RANGE;
+
+ /** Reconnect attempts count. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ protected int reconCnt = DFLT_RECONNECT_CNT;
- /** Leaving nodes (but still in topology). */
- private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
+ /** Statistics print frequency. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"})
+ protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
- /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
- private boolean ipFinderHasLocAddr;
+ /** Maximum message acknowledgement timeout. */
+ protected long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
- /** Addresses that do not respond during join requests send (for resolving concurrent start). */
- private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>();
+ /** Max heartbeats count node can miss without initiating status check. */
+ protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
- /** Addresses that incoming join requests send were send from (for resolving concurrent start). */
- private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>();
+ /** Max heartbeats count node can miss without failing client node. */
+ protected int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
- /** Response on join request from coordinator (in case of duplicate ID or auth failure). */
- private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1();
+ /** IP finder clean frequency. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ protected long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ;
/** Context initialization latch. */
@GridToStringExclude
private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
- /** Node authenticator. */
- private DiscoverySpiNodeAuthenticator nodeAuth;
+ /** */
+ protected final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs =
+ new CopyOnWriteArrayList<>();
+
+ /** */
+ protected final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs =
+ new CopyOnWriteArrayList<>();
+
+ /** Logger. */
+ @LoggerResource
+ protected IgniteLogger log;
+
+ /** */
+ protected TcpDiscoveryImpl impl;
+
+ /** */
+ private boolean clientMode;
+
+ /** {@inheritDoc} */
+ @Override public String getSpiState() {
+ return impl.getSpiState();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMessageWorkerQueueSize() {
+ return impl.getMessageWorkerQueueSize();
+ }
- /** Mutex. */
- private final Object mux = new Object();
+ /** {@inheritDoc} */
+ @Nullable @Override public UUID getCoordinator() {
+ return impl.getCoordinator();
+ }
- /** Discovery state. */
- protected TcpDiscoverySpiState spiState = DISCONNECTED;
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> getRemoteNodes() {
+ return impl.getRemoteNodes();
+ }
- /** Map with proceeding ping requests. */
- private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
- new ConcurrentHashMap8<>();
+ /** {@inheritDoc} */
+ @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+ return impl.getNode(nodeId);
+ }
- /** Debug mode. */
- private boolean debugMode;
+ /** {@inheritDoc} */
+ @Override public boolean pingNode(UUID nodeId) {
+ return impl.pingNode(nodeId);
+ }
- /** Debug messages history. */
- private int debugMsgHist = 512;
+ /** {@inheritDoc} */
+ @Override public void disconnect() throws IgniteSpiException {
+ impl.disconnect();
+ }
- /** Received messages. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private ConcurrentLinkedDeque<String> debugLog;
+ /** {@inheritDoc} */
+ @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) {
+ impl.setAuthenticator(auth);
+ }
- /** */
- private final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs =
- new CopyOnWriteArrayList<>();
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+ impl.sendCustomEvent(msg);
+ }
- /** */
- private final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs =
- new CopyOnWriteArrayList<>();
+ /** {@inheritDoc} */
+ @Override public void failNode(UUID nodeId) {
+ impl.failNode(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dumpDebugInfo() {
+ impl.dumpDebugInfo(log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClientMode() {
+ return clientMode;
+ }
/**
- * Default constructor.
+ * @param clientMode New client mode.
*/
- public TcpDiscoverySpi() {
- ackTimeout = DFLT_ACK_TIMEOUT;
- sockTimeout = DFLT_SOCK_TIMEOUT;
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setClientMode(boolean clientMode) {
+ if (impl != null)
+ throw new IllegalStateException("You cannot change mode, TcpDiscoverySpi already started.");
+
+ this.clientMode = clientMode;
+
+ return this;
}
- /** {@inheritDoc} */
+ /**
+ * Inject resources
+ *
+ * @param ignite Ignite.
+ */
@IgniteInstanceResource
- @Override public void injectResources(Ignite ignite) {
+ @Override protected void injectResources(Ignite ignite) {
super.injectResources(ignite);
// Inject resource.
- if (ignite != null)
+ if (ignite != null) {
+ setLocalAddress(ignite.configuration().getLocalHost());
setAddressResolver(ignite.configuration().getAddressResolver());
+ }
+ }
+
+ /**
+ * Sets local host IP address that discovery SPI uses.
+ * <p>
+ * If not provided, by default a first found non-loopback address
+ * will be used. If there is no non-loopback address available,
+ * then {@link InetAddress#getLocalHost()} will be used.
+ *
+ * @param locAddr IP address.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setLocalAddress(String locAddr) {
+ // Injection should not override value already set by Spring or user.
+ if (this.locAddr == null)
+ this.locAddr = locAddr;
+
+ return this;
+ }
+
+ /**
+ * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method.
+ *
+ * @return local address.
+ */
+ public String getLocalAddress() {
+ return locAddr;
}
/**
@@ -360,8 +461,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
* @see #setAckTimeout(long)
*/
@IgniteSpiConfiguration(optional = true)
- public void setReconnectCount(int reconCnt) {
+ public TcpDiscoverySpi setReconnectCount(int reconCnt) {
this.reconCnt = reconCnt;
+
+ return this;
}
/** {@inheritDoc} */
@@ -382,8 +485,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
* @param maxAckTimeout Maximum acknowledgement timeout.
*/
@IgniteSpiConfiguration(optional = true)
- public void setMaxAckTimeout(long maxAckTimeout) {
+ public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) {
this.maxAckTimeout = maxAckTimeout;
+
+ return this;
}
/** {@inheritDoc} */
@@ -401,8 +506,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
* @param locPort Local port to bind.
*/
@IgniteSpiConfiguration(optional = true)
- public void setLocalPort(int locPort) {
+ public TcpDiscoverySpi setLocalPort(int locPort) {
this.locPort = locPort;
+
+ return this;
}
/** {@inheritDoc} */
@@ -420,8 +527,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
* @param locPortRange Local port range to bind.
*/
@IgniteSpiConfiguration(optional = true)
- public void setLocalPortRange(int locPortRange) {
+ public TcpDiscoverySpi setLocalPortRange(int locPortRange) {
this.locPortRange = locPortRange;
+
+ return this;
}
/** {@inheritDoc} */
@@ -437,8 +546,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
* @param maxMissedHbs Max missed heartbeats.
*/
@IgniteSpiConfiguration(optional = true)
- public void setMaxMissedHeartbeats(int maxMissedHbs) {
+ public TcpDiscoverySpi setMaxMissedHeartbeats(int maxMissedHbs) {
this.maxMissedHbs = maxMissedHbs;
+
+ return this;
}
/** {@inheritDoc} */
@@ -454,8 +565,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
* @param maxMissedClientHbs Max missed client heartbeats.
*/
@IgniteSpiConfiguration(optional = true)
- public void setMaxMissedClientHeartbeats(int maxMissedClientHbs) {
+ public TcpDiscoverySpi setMaxMissedClientHeartbeats(int maxMissedClientHbs) {
this.maxMissedClientHbs = maxMissedClientHbs;
+
+ return this;
}
/** {@inheritDoc} */
@@ -475,8 +588,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
* @param statsPrintFreq Statistics print frequency in milliseconds.
*/
@IgniteSpiConfiguration(optional = true)
- public void setStatisticsPrintFrequency(long statsPrintFreq) {
+ public TcpDiscoverySpi setStatisticsPrintFrequency(long statsPrintFreq) {
this.statsPrintFreq = statsPrintFreq;
+
+ return this;
}
/** {@inheritDoc} */
@@ -492,112 +607,187 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
* @param ipFinderCleanFreq IP finder clean frequency.
*/
@IgniteSpiConfiguration(optional = true)
- public void setIpFinderCleanFrequency(long ipFinderCleanFreq) {
+ public TcpDiscoverySpi setIpFinderCleanFrequency(long ipFinderCleanFreq) {
this.ipFinderCleanFreq = ipFinderCleanFreq;
+
+ return this;
}
/**
- * This method is intended for troubleshooting purposes only.
+ * Gets IP finder for IP addresses sharing and storing.
*
- * @param debugMode {code True} to start SPI in debug mode.
+ * @return IP finder for IP addresses sharing and storing.
*/
- public void setDebugMode(boolean debugMode) {
- this.debugMode = debugMode;
+ public TcpDiscoveryIpFinder getIpFinder() {
+ return ipFinder;
}
/**
- * This method is intended for troubleshooting purposes only.
+ * Sets IP finder for IP addresses sharing and storing.
+ * <p>
+ * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
*
- * @param debugMsgHist Message history log size.
+ * @param ipFinder IP finder.
*/
- public void setDebugMessageHistory(int debugMsgHist) {
- this.debugMsgHist = debugMsgHist;
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setIpFinder(TcpDiscoveryIpFinder ipFinder) {
+ this.ipFinder = ipFinder;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public String getSpiState() {
- synchronized (mux) {
- return spiState.name();
- }
+ /**
+ * Sets socket operations timeout. This timeout is used to limit connection time and
+ * write-to-socket time.
+ * <p>
+ * Note that when running Ignite on Amazon EC2, socket timeout must be set to a value
+ * significantly greater than the default (e.g. to {@code 30000}).
+ * <p>
+ * If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}.
+ *
+ * @param sockTimeout Socket connection timeout.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setSocketTimeout(long sockTimeout) {
+ this.sockTimeout = sockTimeout;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public int getMessageWorkerQueueSize() {
- return msgWorker.queueSize();
+ /**
+ * Sets timeout for receiving acknowledgement for sent message.
+ * <p>
+ * If acknowledgement is not received within this timeout, sending is considered as failed
+ * and SPI tries to repeat message sending.
+ * <p>
+ * If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}.
+ *
+ * @param ackTimeout Acknowledgement timeout.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setAckTimeout(long ackTimeout) {
+ this.ackTimeout = ackTimeout;
+
+ return this;
}
- /** {@inheritDoc} */
- @Nullable @Override public UUID getCoordinator() {
- TcpDiscoveryNode crd = resolveCoordinator();
+ /**
+ * Sets maximum network timeout to use for network operations.
+ * <p>
+ * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}.
+ *
+ * @param netTimeout Network timeout.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setNetworkTimeout(long netTimeout) {
+ this.netTimeout = netTimeout;
- return crd != null ? crd.id() : null;
+ return this;
}
/** {@inheritDoc} */
- @Nullable @Override public ClusterNode getNode(UUID nodeId) {
- assert nodeId != null;
-
- UUID locNodeId0 = getLocalNodeId();
+ @Override public long getJoinTimeout() {
+ return joinTimeout;
+ }
- if (locNodeId0 != null && locNodeId0.equals(nodeId))
- // Return local node directly.
- return locNode;
+ /**
+ * Sets join timeout.
+ * <p>
+ * If non-shared IP finder is used and node fails to connect to
+ * any address from IP finder, node keeps trying to join within this
+ * timeout. If all addresses are still unresponsive, exception is thrown
+ * and node startup fails.
+ * <p>
+ * If not specified, default is {@link #DFLT_JOIN_TIMEOUT}.
+ *
+ * @param joinTimeout Join timeout ({@code 0} means wait forever).
+ *
+ * @see TcpDiscoveryIpFinder#isShared()
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setJoinTimeout(long joinTimeout) {
+ this.joinTimeout = joinTimeout;
- TcpDiscoveryNode node = ring.node(nodeId);
+ return this;
+ }
- if (node != null && !node.visible())
- return null;
+ /**
+ * Sets thread priority. All threads within SPI will be started with it.
+ * <p>
+ * If not provided, default value is {@link #DFLT_THREAD_PRI}
+ *
+ * @param threadPri Thread priority.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setThreadPriority(int threadPri) {
+ this.threadPri = threadPri;
- return node;
+ return this;
}
- /** {@inheritDoc} */
- @Override public Collection<ClusterNode> getRemoteNodes() {
- return F.upcast(ring.visibleRemoteNodes());
+ /**
+ * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
+ * in configurable time interval to other nodes to notify them about its state.
+ * <p>
+ * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
+ *
+ * @param hbFreq Heartbeat frequency in milliseconds.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setHeartbeatFrequency(long hbFreq) {
+ this.hbFreq = hbFreq;
+
+ return this;
}
- /** {@inheritDoc} */
- @Override public void spiStart(String gridName) throws IgniteSpiException {
- spiStart0(false);
+ /**
+ * @return Size of topology snapshots history.
+ */
+ public long getTopHistorySize() {
+ return topHistSize;
}
/**
- * Starts or restarts SPI after stop (to reconnect).
+ * Sets size of topology snapshots history. Specified size should be greater than or equal to default size
+ * {@link #DFLT_TOP_HISTORY_SIZE}.
*
- * @param restart {@code True} if SPI is restarted after stop.
- * @throws IgniteSpiException If failed.
+ * @param topHistSize Size of topology snapshots history.
*/
- private void spiStart0(boolean restart) throws IgniteSpiException {
- if (!restart)
- // It is initial start.
- onSpiStart();
+ @IgniteSpiConfiguration(optional = true)
+ public TcpDiscoverySpi setTopHistorySize(int topHistSize) {
+ if (topHistSize < DFLT_TOP_HISTORY_SIZE) {
+ U.warn(log, "Topology history size should be greater than or equal to default size. " +
+ "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize +
+ ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']');
- synchronized (mux) {
- spiState = DISCONNECTED;
+ return this;
}
- if (debugMode) {
- if (!log.isInfoEnabled())
- throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
- "in debug mode.");
+ this.topHistSize = topHistSize;
- debugLog = new ConcurrentLinkedDeque<>();
-
- U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode.");
- }
+ return this;
+ }
- // Clear addresses collections.
- fromAddrs.clear();
- noResAddrs.clear();
+ /** {@inheritDoc} */
+ @Override public TcpDiscoverySpi setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) {
+ assert locNodeAttrs == null;
+ assert locNodeVer == null;
- sockTimeoutWorker = new SocketTimeoutWorker();
- sockTimeoutWorker.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Node attributes to set: " + attrs);
+ log.debug("Node version to set: " + ver);
+ }
- msgWorker = new RingMessageWorker();
- msgWorker.start();
+ locNodeAttrs = attrs;
+ locNodeVer = ver;
- tcpSrvr = new TcpServer();
+ return this;
+ }
+ /**
+ * @param srvPort Server port.
+ */
+ void initLocalNode(int srvPort, boolean addExtAddrAttr) {
// Init local node.
IgniteBiTuple<Collection<String>, Collection<String>> addrs;
@@ -612,166 +802,190 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
getLocalNodeId(),
addrs.get1(),
addrs.get2(),
- tcpSrvr.port,
+ srvPort,
metricsProvider,
locNodeVer);
- Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
- U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())),
- locNode.discoveryPort());
+ if (addExtAddrAttr) {
+ Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null :
+ U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())),
+ locNode.discoveryPort());
- if (extAddrs != null)
- locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
+ locNodeAddrs = new LinkedHashSet<>();
+ locNodeAddrs.addAll(locNode.socketAddresses());
- locNode.setAttributes(locNodeAttrs);
+ if (extAddrs != null) {
+ locNodeAttrs.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
- locNode.local(true);
+ locNodeAddrs.addAll(extAddrs);
+ }
+ }
- locNodeAddrs = getNodeAddresses(locNode);
+ locNode.setAttributes(locNodeAttrs);
+ locNode.local(true);
if (log.isDebugEnabled())
log.debug("Local node initialized: " + locNode);
+ }
- // Start TCP server thread after local node is initialized.
- tcpSrvr.start();
-
- ring.localNode(locNode);
+ /**
+ * @param node Node.
+ * @return {@link LinkedHashSet} of internal and external addresses of provided node.
+ * Internal addresses placed before external addresses.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) {
+ LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses());
- if (ipFinder.isShared())
- registerLocalNodeAddress();
- else {
- if (F.isEmpty(ipFinder.getRegisteredAddresses()))
- throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " +
- "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " +
- "(specify list of IP addresses in configuration).");
+ Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
- ipFinderHasLocAddr = ipFinderHasLocalAddress();
- }
+ if (extAddrs != null)
+ res.addAll(extAddrs);
- if (statsPrintFreq > 0 && log.isInfoEnabled()) {
- statsPrinter = new StatisticsPrinter();
- statsPrinter.start();
- }
+ return res;
+ }
- stats.onJoinStarted();
+ /**
+ * @param node Node.
+ * @param sameHost Same host flag.
+ * @return {@link LinkedHashSet} of internal and external addresses of provided node.
+ * Internal addresses placed before external addresses.
+ * Internal addresses will be sorted with {@code inetAddressesComparator(sameHost)}.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node, boolean sameHost) {
+ List<InetSocketAddress> addrs = U.arrayList(node.socketAddresses());
- joinTopology();
+ Collections.sort(addrs, U.inetAddressesComparator(sameHost));
- stats.onJoinFinished();
+ LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(addrs);
- hbsSnd = new HeartbeatsSender();
- hbsSnd.start();
+ Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
- chkStatusSnd = new CheckStatusSender();
- chkStatusSnd.start();
+ if (extAddrs != null)
+ res.addAll(extAddrs);
- if (ipFinder.isShared()) {
- ipFinderCleaner = new IpFinderCleaner();
- ipFinderCleaner.start();
- }
+ return res;
+ }
- if (log.isDebugEnabled() && !restart)
- log.debug(startInfo());
+ /** {@inheritDoc} */
+ @Override public Collection<Object> injectables() {
+ return F.<Object>asList(ipFinder);
+ }
- if (restart)
- getSpiContext().registerPort(tcpSrvr.port, TCP);
+ /** {@inheritDoc} */
+ @Override public long getSocketTimeout() {
+ return sockTimeout;
}
- /**
- * @throws IgniteSpiException If failed.
- */
- @SuppressWarnings("BusyWait")
- private void registerLocalNodeAddress() throws IgniteSpiException {
- // Make sure address registration succeeded.
- while (true) {
- try {
- ipFinder.initializeLocalAddresses(locNode.socketAddresses());
+ /** {@inheritDoc} */
+ @Override public long getAckTimeout() {
+ return ackTimeout;
+ }
- // Success.
- break;
- }
- catch (IllegalStateException e) {
- throw new IgniteSpiException("Failed to register local node address with IP finder: " +
- locNode.socketAddresses(), e);
- }
- catch (IgniteSpiException e) {
- LT.error(log, e, "Failed to register local node address in IP finder on start " +
- "(retrying every 2000 ms).");
- }
+ /** {@inheritDoc} */
+ @Override public long getNetworkTimeout() {
+ return netTimeout;
+ }
- try {
- U.sleep(2000);
- }
- catch (IgniteInterruptedCheckedException e) {
- throw new IgniteSpiException("Thread has been interrupted.", e);
- }
- }
+ /** {@inheritDoc} */
+ @Override public int getThreadPriority() {
+ return threadPri;
}
- /**
- * @throws IgniteSpiException If failed.
- */
- private void onSpiStart() throws IgniteSpiException {
- startStopwatch();
+ /** {@inheritDoc} */
+ @Override public long getHeartbeatFrequency() {
+ return hbFreq;
+ }
- checkParameters();
+ /** {@inheritDoc} */
+ @Override public String getIpFinderFormatted() {
+ return ipFinder.toString();
+ }
- assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0");
- assertParameter(locPort > 1023, "localPort > 1023");
- assertParameter(locPortRange >= 0, "localPortRange >= 0");
- assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff");
- assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
- assertParameter(reconCnt > 0, "reconnectCnt > 0");
- assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0");
- assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0");
- assertParameter(threadPri > 0, "threadPri > 0");
- assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0");
+ /** {@inheritDoc} */
+ @Override public long getNodesJoined() {
+ return stats.joinedNodesCount();
+ }
- try {
- locHost = U.resolveLocalHost(locAddr);
- }
- catch (IOException e) {
- throw new IgniteSpiException("Unknown local address: " + locAddr, e);
- }
+ /** {@inheritDoc} */
+ @Override public long getNodesLeft() {
+ return stats.leftNodesCount();
+ }
- if (log.isDebugEnabled()) {
- log.debug(configInfo("localHost", locHost.getHostAddress()));
- log.debug(configInfo("localPort", locPort));
- log.debug(configInfo("localPortRange", locPortRange));
- log.debug(configInfo("threadPri", threadPri));
- log.debug(configInfo("networkTimeout", netTimeout));
- log.debug(configInfo("sockTimeout", sockTimeout));
- log.debug(configInfo("ackTimeout", ackTimeout));
- log.debug(configInfo("maxAckTimeout", maxAckTimeout));
- log.debug(configInfo("reconnectCount", reconCnt));
- log.debug(configInfo("ipFinder", ipFinder));
- log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq));
- log.debug(configInfo("heartbeatFreq", hbFreq));
- log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs));
- log.debug(configInfo("statsPrintFreq", statsPrintFreq));
- }
+ /** {@inheritDoc} */
+ @Override public long getNodesFailed() {
+ return stats.failedNodesCount();
+ }
- // Warn on odd network timeout.
- if (netTimeout < 3000)
- U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
+ /** {@inheritDoc} */
+ @Override public long getPendingMessagesRegistered() {
+ return stats.pendingMessagesRegistered();
+ }
- registerMBean(gridName, this, TcpDiscoverySpiMBean.class);
+ /** {@inheritDoc} */
+ @Override public long getPendingMessagesDiscarded() {
+ return stats.pendingMessagesDiscarded();
+ }
- if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
- TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder);
+ /** {@inheritDoc} */
+ @Override public long getAvgMessageProcessingTime() {
+ return stats.avgMessageProcessingTime();
+ }
- if (mcastIpFinder.getLocalAddress() == null)
- mcastIpFinder.setLocalAddress(locAddr);
- }
+ /** {@inheritDoc} */
+ @Override public long getMaxMessageProcessingTime() {
+ return stats.maxMessageProcessingTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalReceivedMessages() {
+ return stats.totalReceivedMessages();
}
/** {@inheritDoc} */
- @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+ @Override public Map<String, Integer> getReceivedMessages() {
+ return stats.receivedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTotalProcessedMessages() {
+ return stats.totalProcessedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Integer> getProcessedMessages() {
+ return stats.processedMessages();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCoordinatorSinceTimestamp() {
+ return stats.coordinatorSinceTimestamp();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
super.onContextInitialized0(spiCtx);
ctxInitLatch.countDown();
- spiCtx.registerPort(tcpSrvr.port, TCP);
+ ipFinder.onSpiContextInitialized(spiCtx);
+
+ impl.onContextInitialized0(spiCtx);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onContextDestroyed0() {
+ super.onContextDestroyed0();
+
+ if (ctxInitLatch.getCount() > 0)
+ // Safety.
+ ctxInitLatch.countDown();
+
+ if (ipFinder != null)
+ ipFinder.onSpiContextDestroyed();
+
+ getSpiContext().deregisterPorts();
}
/** {@inheritDoc} */
@@ -795,4621 +1009,858 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
}
/** {@inheritDoc} */
- @Override public void spiStop() throws IgniteSpiException {
- spiStop0(false);
+ @Override public ClusterNode getLocalNode() {
+ return locNode;
}
- /**
- * Stops SPI finally or stops SPI for restart.
- *
- * @param disconnect {@code True} if SPI is being disconnected.
- * @throws IgniteSpiException If failed.
- */
- private void spiStop0(boolean disconnect) throws IgniteSpiException {
- if (ctxInitLatch.getCount() > 0)
- // Safety.
- ctxInitLatch.countDown();
+ /** {@inheritDoc} */
+ @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+ this.lsnr = lsnr;
+ }
- if (log.isDebugEnabled()) {
- if (disconnect)
- log.debug("Disconnecting SPI.");
- else
- log.debug("Preparing to start local node stop procedure.");
- }
+ /** {@inheritDoc} */
+ @Override public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange) {
+ this.exchange = exchange;
- if (disconnect) {
- synchronized (mux) {
- spiState = DISCONNECTING;
- }
- }
+ return this;
+ }
- if (msgWorker != null && msgWorker.isAlive() && !disconnect) {
- // Send node left message only if it is final stop.
- msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(getLocalNodeId()));
+ /** {@inheritDoc} */
+ @Override public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider) {
+ this.metricsProvider = metricsProvider;
- synchronized (mux) {
- long threshold = U.currentTimeMillis() + netTimeout;
+ return this;
+ }
- long timeout = netTimeout;
+ /** {@inheritDoc} */
+ @Override public long getGridStartTime() {
+ assert gridStartTime != 0;
- while (spiState != LEFT && timeout > 0) {
- try {
- mux.wait(timeout);
+ return gridStartTime;
+ }
- timeout = threshold - U.currentTimeMillis();
- }
- catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
+ /**
+ * @param sockAddr Remote address.
+ * @return Opened socket.
+ * @throws IOException If failed.
+ */
+ protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+ assert sockAddr != null;
- break;
- }
- }
+ InetSocketAddress resolved = sockAddr.isUnresolved() ?
+ new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr;
- if (spiState == LEFT) {
- if (log.isDebugEnabled())
- log.debug("Verification for local node leave has been received from coordinator" +
- " (continuing stop procedure).");
- }
- else if (log.isInfoEnabled()) {
- log.info("No verification for local node leave has been received from coordinator" +
- " (will stop node anyway).");
- }
- }
- }
+ InetAddress addr = resolved.getAddress();
- U.interrupt(tcpSrvr);
- U.join(tcpSrvr, log);
+ assert addr != null;
- Collection<SocketReader> tmp;
+ Socket sock = new Socket();
- synchronized (mux) {
- tmp = U.arrayList(readers);
- }
+ sock.bind(new InetSocketAddress(locHost, 0));
- U.interrupt(tmp);
- U.joinThreads(tmp, log);
+ sock.setTcpNoDelay(true);
- U.interrupt(hbsSnd);
- U.join(hbsSnd, log);
+ sock.connect(resolved, (int)sockTimeout);
- U.interrupt(chkStatusSnd);
- U.join(chkStatusSnd, log);
+ writeToSocket(sock, U.IGNITE_HEADER);
- U.interrupt(ipFinderCleaner);
- U.join(ipFinderCleaner, log);
+ return sock;
+ }
- U.interrupt(msgWorker);
- U.join(msgWorker, log);
+ /**
+ * Writes message to the socket.
+ *
+ * @param sock Socket.
+ * @param data Raw data to write.
+ * @throws IOException If IO failed or write timed out.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ protected void writeToSocket(Socket sock, byte[] data) throws IOException {
+ assert sock != null;
+ assert data != null;
- U.interrupt(sockTimeoutWorker);
- U.join(sockTimeoutWorker, log);
+ SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
- U.interrupt(statsPrinter);
- U.join(statsPrinter, log);
+ sockTimeoutWorker.addTimeoutObject(obj);
- if (ipFinder != null)
- ipFinder.close();
+ IOException err = null;
- Collection<TcpDiscoveryNode> rmts = null;
+ try {
+ OutputStream out = sock.getOutputStream();
- if (!disconnect) {
- // This is final stop.
- unregisterMBean();
+ out.write(data);
- if (log.isDebugEnabled())
- log.debug(stopInfo());
+ out.flush();
}
- else {
- getSpiContext().deregisterPorts();
-
- rmts = ring.visibleRemoteNodes();
+ catch (IOException e) {
+ err = e;
}
+ finally {
+ boolean cancelled = obj.cancel();
- long topVer = ring.topologyVersion();
-
- ring.clear();
-
- if (rmts != null && !rmts.isEmpty()) {
- // This is restart/disconnection and remote nodes are not empty.
- // We need to fire FAIL event for each.
- DiscoverySpiListener lsnr = this.lsnr;
+ if (cancelled)
+ sockTimeoutWorker.removeTimeoutObject(obj);
- if (lsnr != null) {
- Set<ClusterNode> processed = new HashSet<>();
+ // Throw original exception.
+ if (err != null)
+ throw err;
- for (TcpDiscoveryNode n : rmts) {
- assert n.visible();
+ if (!cancelled)
+ throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
+ }
+ }
- processed.add(n);
+ /**
+ * Writes message to the socket.
+ *
+ * @param sock Socket.
+ * @param msg Message.
+ * @throws IOException If IO failed or write timed out.
+ * @throws IgniteCheckedException If marshalling failed.
+ */
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException {
+ writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K.
+ }
- List<ClusterNode> top = U.arrayList(rmts, F.notIn(processed));
+ /**
+ * Writes message to the socket.
+ *
+ * @param sock Socket.
+ * @param msg Message.
+ * @param bout Byte array output stream.
+ * @throws IOException If IO failed or write timed out.
+ * @throws IgniteCheckedException If marshalling failed.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout)
+ throws IOException, IgniteCheckedException {
+ assert sock != null;
+ assert msg != null;
+ assert bout != null;
- topVer++;
+ // Marshall message first to perform only write after.
+ marsh.marshal(msg, bout);
- Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer,
- Collections.unmodifiableList(top));
+ SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
- lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null);
- }
- }
- }
+ sockTimeoutWorker.addTimeoutObject(obj);
- printStatistics();
+ IOException err = null;
- stats.clear();
+ try {
+ OutputStream out = sock.getOutputStream();
- synchronized (mux) {
- // Clear stored data.
- leavingNodes.clear();
- failedNodes.clear();
+ bout.writeTo(out);
- spiState = DISCONNECTED;
+ out.flush();
}
- }
+ catch (IOException e) {
+ err = e;
+ }
+ finally {
+ boolean cancelled = obj.cancel();
- /** {@inheritDoc} */
- @Override protected void onContextDestroyed0() {
- super.onContextDestroyed0();
+ if (cancelled)
+ sockTimeoutWorker.removeTimeoutObject(obj);
- if (ctxInitLatch.getCount() > 0)
- // Safety.
- ctxInitLatch.countDown();
+ // Throw original exception.
+ if (err != null)
+ throw err;
- getSpiContext().deregisterPorts();
+ if (!cancelled)
+ throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
+ }
}
/**
- * @throws IgniteSpiException If any error occurs.
- * @return {@code true} if IP finder contains local address.
+ * Writes response to the socket.
+ *
+ * @param sock Socket.
+ * @param res Integer response.
+ * @throws IOException If IO failed or write timed out.
*/
- private boolean ipFinderHasLocalAddress() throws IgniteSpiException {
- for (InetSocketAddress locAddr : locNodeAddrs) {
- for (InetSocketAddress addr : registeredAddresses())
- try {
- int port = addr.getPort();
-
- InetSocketAddress resolved = addr.isUnresolved() ?
- new InetSocketAddress(InetAddress.getByName(addr.getHostName()), port) :
- new InetSocketAddress(addr.getAddress(), port);
-
- if (resolved.equals(locAddr))
- return true;
- }
- catch (UnknownHostException e) {
- onException(e.getMessage(), e);
- }
- }
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ protected void writeToSocket(Socket sock, int res) throws IOException {
+ assert sock != null;
- return false;
- }
+ SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
- /** {@inheritDoc} */
- @Override public boolean pingNode(UUID nodeId) {
- assert nodeId != null;
+ sockTimeoutWorker.addTimeoutObject(obj);
- if (log.isDebugEnabled())
- log.debug("Pinging node: " + nodeId + "].");
+ OutputStream out = sock.getOutputStream();
- if (nodeId == getLocalNodeId())
- return true;
+ IOException err = null;
- TcpDiscoveryNode node = ring.node(nodeId);
+ try {
+ out.write(res);
- if (node == null || !node.visible())
- return false;
+ out.flush();
+ }
+ catch (IOException e) {
+ err = e;
+ }
+ finally {
+ boolean cancelled = obj.cancel();
- boolean res = pingNode(node);
+ if (cancelled)
+ sockTimeoutWorker.removeTimeoutObject(obj);
- if (!res && !node.isClient()) {
- LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId);
+ // Throw original exception.
+ if (err != null)
+ throw err;
- msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id()));
+ if (!cancelled)
+ throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
}
-
- return res;
}
/**
- * Pings the remote node to see if it's alive.
+ * Reads message from the socket limiting read time.
*
- * @param node Node.
- * @return {@code True} if ping succeeds.
+ * @param sock Socket.
+ * @param in Input stream (in case socket stream was wrapped).
+ * @param timeout Socket timeout for this operation.
+ * @return Message.
+ * @throws IOException If IO failed or read timed out.
+ * @throws IgniteCheckedException If unmarshalling failed.
*/
- private boolean pingNode(TcpDiscoveryNode node) {
- assert node != null;
-
- if (node.id().equals(getLocalNodeId()))
- return true;
-
- UUID clientNodeId = null;
+ protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException {
+ assert sock != null;
- if (node.isClient()) {
- clientNodeId = node.id();
+ int oldTimeout = sock.getSoTimeout();
- node = ring.node(node.clientRouterNodeId());
+ try {
+ sock.setSoTimeout((int)timeout);
- if (node == null || !node.visible())
- return false;
+ return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
}
+ catch (IOException | IgniteCheckedException e) {
+ if (X.hasCause(e, SocketTimeoutException.class))
+ LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " +
+ "in long GC pauses on remote node. Current timeout: " + timeout + '.');
- for (InetSocketAddress addr : getNodeAddresses(node, U.sameMacs(locNode, node))) {
+ throw e;
+ }
+ finally {
+ // Quietly restore timeout.
try {
- // ID returned by the node should be the same as ID of the parameter for ping to succeed.
- IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
-
- return node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
+ sock.setSoTimeout(oldTimeout);
}
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']');
-
- onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e);
- // continue;
+ catch (SocketException ignored) {
+ // No-op.
}
}
-
- return false;
}
/**
- * Pings the node by its address to see if it's alive.
+ * Reads message delivery receipt from the socket.
*
- * @param addr Address of the node.
- * @return ID of the remote node and "client exists" flag if node alive.
- * @throws IgniteSpiException If an error occurs.
+ * @param sock Socket.
+ * @param timeout Socket timeout for this operation.
+ * @return Receipt.
+ * @throws IOException If IO failed or read timed out.
*/
- private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId)
- throws IgniteCheckedException {
- assert addr != null;
+ protected int readReceipt(Socket sock, long timeout) throws IOException {
+ assert sock != null;
- UUID locNodeId = getLocalNodeId();
+ int oldTimeout = sock.getSoTimeout();
+
+ try {
+ sock.setSoTimeout((int)timeout);
- if (F.contains(locNodeAddrs, addr)) {
- if (clientNodeId == null)
- return F.t(getLocalNodeId(), false);
+ int res = sock.getInputStream().read();
- ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId);
+ if (res == -1)
+ throw new EOFException();
- if (clientWorker == null)
- return F.t(getLocalNodeId(), false);
+ return res;
+ }
+ catch (SocketTimeoutException e) {
+ LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " +
+ "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " +
+ "configuration property). Will retry to send message with increased timeout. " +
+ "Current timeout: " + timeout + '.');
- boolean clientPingRes;
+ stats.onAckTimeout();
+ throw e;
+ }
+ finally {
+ // Quietly restore timeout.
try {
- clientPingRes = clientWorker.ping();
+ sock.setSoTimeout(oldTimeout);
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
+ catch (SocketException ignored) {
+ // No-op.
}
-
- return F.t(getLocalNodeId(), clientPingRes);
}
+ }
- GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>();
-
- IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
+ /**
+ * Resolves addresses registered in the IP finder, removes duplicates and local host
+ * address and returns the collection of.
+ *
+ * @return Resolved addresses without duplicates and local address (potentially
+ * empty but never null).
+ * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
+ */
+ protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException {
+ List<InetSocketAddress> res = new ArrayList<>();
- if (oldFut != null)
- return oldFut.get();
- else {
- Collection<Throwable> errs = null;
+ Collection<InetSocketAddress> addrs;
+ // Get consistent addresses collection.
+ while (true) {
try {
- Socket sock = null;
-
- for (int i = 0; i < reconCnt; i++) {
- try {
- if (addr.isUnresolved())
- addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
+ addrs = registeredAddresses();
- long tstamp = U.currentTimeMillis();
-
- sock = openSocket(addr);
+ break;
+ }
+ catch (IgniteSpiException e) {
+ LT.error(log, e, "Failed to get registered addresses from IP finder on start " +
+ "(retrying every 2000 ms).");
+ }
- writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+ try {
+ U.sleep(2000);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteSpiException("Thread has been interrupted.", e);
+ }
+ }
- TcpDiscoveryPingResponse res = readMessage(sock, null, netTimeout);
+ for (InetSocketAddress addr : addrs) {
+ assert addr != null;
- if (locNodeId.equals(res.creatorNodeId())) {
- if (log.isDebugEnabled())
- log.debug("Ping response from local node: " + res);
+ try {
+ InetSocketAddress resolved = addr.isUnresolved() ?
+ new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr;
- break;
- }
+ if (locNodeAddrs == null || !locNodeAddrs.contains(resolved))
+ res.add(resolved);
+ }
+ catch (UnknownHostException ignored) {
+ LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr);
- stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+ // Add address in any case.
+ res.add(addr);
+ }
+ }
- IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists());
+ if (!res.isEmpty())
+ Collections.shuffle(res);
- fut.onDone(t);
-
- return t;
- }
- catch (IOException | IgniteCheckedException e) {
- if (errs == null)
- errs = new ArrayList<>();
-
- errs.add(e);
- }
- finally {
- U.closeQuiet(sock);
- }
- }
- }
- catch (Throwable t) {
- fut.onDone(t);
-
- if (t instanceof Error)
- throw t;
-
- throw U.cast(t);
- }
- finally {
- if (!fut.isDone())
- fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs));
-
- boolean b = pingMap.remove(addr, fut);
-
- assert b;
- }
-
- return fut.get();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void disconnect() throws IgniteSpiException {
- spiStop0(true);
- }
-
- /** {@inheritDoc} */
- @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator nodeAuth) {
- this.nodeAuth = nodeAuth;
- }
-
- /** {@inheritDoc} */
- @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
- try {
- msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, marsh.marshal(evt)));
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void failNode(UUID nodeId) {
- ClusterNode node = ring.node(nodeId);
-
- if (node != null) {
- TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
- node.id(), node.order());
-
- msgWorker.addMessage(msg);
- }
- }
-
- /**
- * Tries to join this node to topology.
- *
- * @throws IgniteSpiException If any error occurs.
- */
- private void joinTopology() throws IgniteSpiException {
- synchronized (mux) {
- assert spiState == CONNECTING || spiState == DISCONNECTED;
-
- spiState = CONNECTING;
- }
-
- SecurityCredentials locCred = (SecurityCredentials)locNode.getAttributes()
- .get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
- // Marshal credentials for backward compatibility and security.
- marshalCredentials(locNode);
-
- while (true) {
- if (!sendJoinRequestMessage()) {
- if (log.isDebugEnabled())
- log.debug("Join request message has not been sent (local node is the first in the topology).");
-
- if (nodeAuth != null) {
- // Authenticate local node.
- try {
- SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred);
-
- if (subj == null)
- throw new IgniteSpiException("Authentication failed for local node: " + locNode.id());
-
- Map<String, Object> attrs = new HashMap<>(locNode.attributes());
-
- attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT,
- ignite.configuration().getMarshaller().marshal(subj));
- attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
- locNode.setAttributes(attrs);
- }
- catch (IgniteException | IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e);
- }
- }
-
- locNode.order(1);
- locNode.internalOrder(1);
-
- gridStartTime = U.currentTimeMillis();
-
- locNode.visible(true);
-
- ring.clear();
-
- ring.topologyVersion(1);
-
- synchronized (mux) {
- topHist.clear();
-
- spiState = CONNECTED;
-
- mux.notifyAll();
- }
-
- notifyDiscovery(EVT_NODE_JOINED, 1, locNode);
-
- break;
- }
-
- if (log.isDebugEnabled())
- log.debug("Join request message has been sent (waiting for coordinator response).");
-
- synchronized (mux) {
- long threshold = U.currentTimeMillis() + netTimeout;
-
- long timeout = netTimeout;
-
- while (spiState == CONNECTING && timeout > 0) {
- try {
- mux.wait(timeout);
-
- timeout = threshold - U.currentTimeMillis();
- }
- catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
-
- throw new IgniteSpiException("Thread has been interrupted.");
- }
- }
-
- if (spiState == CONNECTED)
- break;
- else if (spiState == DUPLICATE_ID)
- throw duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get());
- else if (spiState == AUTH_FAILED)
- throw authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get());
- else if (spiState == CHECK_FAILED)
- throw checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get());
- else if (spiState == LOOPBACK_PROBLEM) {
- TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get();
-
- boolean locHostLoopback = locHost.isLoopbackAddress();
-
- String firstNode = locHostLoopback ? "local" : "remote";
-
- String secondNode = locHostLoopback ? "remote" : "local";
-
- throw new IgniteSpiException("Failed to add node to topology because " + firstNode +
- " node is configured to use loopback address, but " + secondNode + " node is not " +
- "(consider changing 'localAddress' configuration parameter) " +
- "[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" +
- U.addressesAsString(msg.addresses(), msg.hostNames()) + ']');
- }
- else
- LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " +
- "Check remote nodes logs for possible error messages. " +
- "Note that large topology may require significant time to start. " +
- "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " +
- "if getting this message on the starting nodes [networkTimeout=" + netTimeout + ']');
- }
- }
-
- assert locNode.order() != 0;
- assert locNode.internalOrder() != 0;
-
- if (log.isDebugEnabled())
- log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder());
- }
+ return res;
+ }
/**
- * Tries to send join request message to a random node presenting in topology.
- * Address is provided by {@link TcpDiscoveryIpFinder} and message is
- * sent to first node connection succeeded to.
+ * Gets addresses registered in the IP finder, initializes addresses having no
+ * port (or 0 port) with {@link #DFLT_PORT}.
*
- * @return {@code true} if send succeeded.
- * @throws IgniteSpiException If any error occurs.
+ * @return Registered addresses.
+ * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
*/
- @SuppressWarnings({"BusyWait"})
- private boolean sendJoinRequestMessage() throws IgniteSpiException {
- TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
- collectExchangeData(getLocalNodeId()));
-
- // Time when it has been detected, that addresses from IP finder do not respond.
- long noResStart = 0;
-
- while (true) {
- Collection<InetSocketAddress> addrs = resolvedAddresses();
-
- if (F.isEmpty(addrs))
- return false;
+ protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException {
+ Collection<InetSocketAddress> res = new ArrayList<>();
- boolean retry = false;
- Collection<Exception> errs = new ArrayList<>();
+ for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) {
+ if (addr.getPort() == 0) {
+ // TcpDiscoveryNode.discoveryPort() returns an correct port for a server node and 0 for client node.
+ int port = locNode.discoveryPort() != 0 ? locNode.discoveryPort() : DFLT_PORT;
- try (SocketMultiConnector multiConnector = new SocketMultiConnector(this, addrs, 2)) {
- GridTuple3<InetSocketAddress, Socket, Exception> tuple;
-
- while ((tuple = multiConnector.next()) != null) {
- InetSocketAddress addr = tuple.get1();
- Socket sock = tuple.get2();
- Exception ex = tuple.get3();
-
- if (ex == null) {
- assert sock != null;
-
- try {
- Integer res = sendMessageDirectly(joinReq, addr, sock);
-
- assert res != null;
-
- noResAddrs.remove(addr);
-
- // Address is responsive, reset period start.
- noResStart = 0;
-
- switch (res) {
- case RES_WAIT:
- // Concurrent startup, try sending join request again or wait if no success.
- retry = true;
-
- break;
- case RES_OK:
- if (log.isDebugEnabled())
- log.debug("Join request message has been sent to address [addr=" + addr +
- ", req=" + joinReq + ']');
-
- // Join request sending succeeded, wait for response from topology.
- return true;
-
- default:
- // Concurrent startup, try next node.
- if (res == RES_CONTINUE_JOIN) {
- if (!fromAddrs.contains(addr))
- retry = true;
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Unexpected response to join request: " + res);
-
- retry = true;
- }
-
- break;
- }
- }
- catch (IgniteSpiException e) {
- e.printStackTrace();
-
- ex = e;
- }
- }
-
- if (ex != null) {
- errs.add(ex);
-
- if (log.isDebugEnabled()) {
- IOException ioe = X.cause(ex, IOException.class);
-
- log.debug("Failed to send join request message [addr=" + addr +
- ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']');
-
- onException("Failed to send join request message [addr=" + addr +
- ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']', ioe);
- }
-
- noResAddrs.add(addr);
- }
- }
+ addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), port) :
+ new InetSocketAddress(addr.getAddress(), port);
}
- if (retry) {
- if (log.isDebugEnabled())
- log.debug("Concurrent discovery SPI start has been detected (local node should wait).");
-
- try {
- U.sleep(2000);
- }
- catch (IgniteInterruptedCheckedException e) {
- throw new IgniteSpiException("Thread has been interrupted.", e);
- }
- }
- else if (!ipFinder.isShared() && !ipFinderHasLocAddr) {
- IgniteCheckedException e = null;
-
- if (!errs.isEmpty()) {
- e = new IgniteCheckedException("Multiple connection attempts failed.");
-
- for (Exception err : errs)
- e.addSuppressed(err);
- }
-
- if (e != null && X.hasCause(e, ConnectException.class))
- LT.warn(log, null, "Failed to connect to any address from IP finder " +
- "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
- addrs);
-
- if (joinTimeout > 0) {
- if (noResStart == 0)
- noResStart = U.currentTimeMillis();
- else if (U.currentTimeMillis() - noResStart > joinTimeout)
- throw new IgniteSpiException(
- "Failed to connect to any address from IP finder within join timeout " +
- "(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
- "on all host machines, or consider increasing 'joinTimeout' configuration property): " +
- addrs, e);
- }
-
- try {
- U.sleep(2000);
- }
- catch (IgniteInterruptedCheckedException ex) {
- throw new IgniteSpiException("Thread has been interrupted.", ex);
- }
- }
- else
- break;
+ res.add(addr);
}
- return false;
+ return res;
}
/**
- * Establishes connection to an address, sends message and returns the response (if any).
- *
- * @param msg Message to send.
- * @param addr Address to send message to.
- * @return Response read from the recipient or {@code null} if no response is supposed.
- * @throws IgniteSpiException If an error occurs.
+ * @param msg Message.
+ * @return Error.
*/
- @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, Socket sock)
- throws IgniteSpiException {
+ protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) {
assert msg != null;
- assert addr != null;
-
- Collection<Throwable> errs = null;
-
- long ackTimeout0 = ackTimeout;
-
- int connectAttempts = 1;
-
- boolean joinReqSent = false;
-
- UUID locNodeId = getLocalNodeId();
-
- for (int i = 0; i < reconCnt; i++) {
- // Need to set to false on each new iteration,
- // since remote node may leave in the middle of the first iteration.
- joinReqSent = false;
-
- boolean openSock = false;
-
- try {
- long tstamp = U.currentTimeMillis();
-
- if (sock == null)
- sock = openSocket(addr);
-
- openSock = true;
-
- // Handshake.
- writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
-
- TcpDiscoveryHandshakeResponse res = readMessage(sock, null, ackTimeout0);
-
- if (locNodeId.equals(res.creatorNodeId())) {
- if (log.isDebugEnabled())
- log.debug("Handshake response from local node: " + res);
-
- break;
- }
-
- stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
-
- // Send message.
- tstamp = U.currentTimeMillis();
-
- writeToSocket(sock, msg);
-
- stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
-
- if (debugMode)
- debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
- ", rmtNodeId=" + res.creatorNodeId() + ']');
-
- if (log.isDebugEnabled())
- log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
- ", rmtNodeId=" + res.creatorNodeId() + ']');
-
- // Connection has been established, but
- // join request may not be unmarshalled on remote host.
- // E.g. due to class not found issue.
- joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
-
- return readReceipt(sock, ackTimeout0);
- }
- catch (ClassCastException e) {
- // This issue is rarely reproducible on AmazonEC2, but never
- // on dedicated machines.
- if (log.isDebugEnabled())
- U.error(log, "Class cast exception on direct send: " + addr, e);
-
- onException("Class cast exception on direct send: " + addr, e);
-
- if (errs == null)
- errs = new ArrayList<>();
-
- errs.add(e);
- }
- catch (IOException | IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.error("Exception on direct send: " + e.getMessage(), e);
-
- onException("Exception on direct send: " + e.getMessage(), e);
-
- if (errs == null)
- errs = new ArrayList<>();
-
- errs.add(e);
-
- if (!openSock) {
- // Reconnect for the second time, if connection is not established.
- if (connectAttempts < 2) {
- connectAttempts++;
-
- continue;
- }
-
- break; // Don't retry if we can not establish connection.
- }
-
- if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
- ackTimeout0 *= 2;
-
- if (!checkAckTimeout(ackTimeout0))
- break;
- }
- }
- finally {
- U.closeQuiet(sock);
-
- sock = null;
- }
- }
-
- if (joinReqSent) {
- if (log.isDebugEnabled())
- log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT).");
-
- // Topology will not include this node,
- // however, warning on timed out join will be output.
- return RES_OK;
- }
- throw new IgniteSpiException(
- "Failed to send message to address [addr=" + addr + ", msg=" + msg + ']',
- U.exceptionWithSuppressed("Failed to send message to address " +
- "[addr=" + addr + ", msg=" + msg + ']', errs));
+ return new IgniteSpiException("Local node has the same ID as existing node in topology " +
+ "(fix configuration and restart local node) [localNode=" + locNode +
+ ", existingNode=" + msg.node() + ']');
}
/**
- * Marshalls credentials with discovery SPI marshaller (will replace attribute value).
- *
- * @param node Node to marshall credentials for.
- * @throws IgniteSpiException If marshalling failed.
+ * @param msg Message.
+ * @return Error.
*/
- private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
- try {
- // Use security-unsafe getter.
- Map<String, Object> attrs = new HashMap<>(node.getAttributes());
-
- attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
- marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+ protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) {
+ assert msg != null;
- node.setAttributes(attrs);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e);
- }
+ return new IgniteSpiException(new IgniteAuthenticationException("Authentication failed [nodeId=" +
+ msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']'));
}
/**
- * Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value).
- *
- * @param node Node to unmarshall credentials for.
- * @return Security credentials.
- * @throws IgniteSpiException If unmarshal fails.
+ * @param msg Message.
+ * @return Error.
*/
- private SecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
- try {
- byte[] credBytes = (byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
-
- if (credBytes == null)
- return null;
+ protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) {
+ assert msg != null;
- return marsh.unmarshal(credBytes, null);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
- }
+ return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) :
+ new IgniteSpiException(msg.error());
}
/**
- * @param ackTimeout Acknowledgement timeout.
- * @return {@code True} if acknowledgement timeout is less or equal to
- * maximum acknowledgement timeout, {@code false} otherwise.
+ * @param msg Message.
+ * @return Whether delivery of the message is ensured.
*/
- private boolean checkAckTimeout(long ackTimeout) {
- if (ackTimeout > maxAckTimeout) {
- LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
- "(consider increasing 'maxAckTimeout' configuration property) " +
- "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + maxAckTimeout + ']');
-
- return false;
- }
-
- return true;
+ protected boolean ensured(TcpDiscoveryAbstractMessage msg) {
+ return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null;
}
/**
- * Notify external listener on discovery event.
- *
- * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
- * @param topVer Topology version.
- * @param node Remote node this event is connected with.
+ * @param msg Failed message.
+ * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise.
+ * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it
+ * and create separate message for failed version check with next major release.
*/
- private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) {
- assert type > 0;
- assert node != null;
-
- DiscoverySpiListener lsnr = this.lsnr;
-
- TcpDiscoverySpiState spiState = spiStateCopy();
-
- if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) {
- if (log.isDebugEnabled())
- log.debug("Discovery notification [node=" + node + ", spiState=" + spiState +
- ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
-
- Collection<ClusterNode> top = F.upcast(ring.visibleNodes());
-
- Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top);
-
- lsnr.onDiscovery(type, topVer, node, top, hist, null);
- }
- else if (log.isDebugEnabled())
- log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState +
- ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
+ @Deprecated
+ private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) {
+ return msg.error().contains("versions are not compatible");
}
/**
- * Update topology history with new topology snapshots.
- *
- * @param topVer Topology version.
- * @param top Topology snapshot.
- * @return Copy of updated topology history.
+ * @param nodeId Node ID.
+ * @return Marshalled exchange data.
*/
- @Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
- synchronized (mux) {
- if (topHist.containsKey(topVer))
- return null;
+ protected Map<Integer, byte[]> collectExchangeData(UUID nodeId) {
+ Map<Integer, Serializable> data = exchange.collect(nodeId);
- topHist.put(topVer, top);
+ if (data == null)
+ return null;
- while (topHist.size() > topHistSize)
- topHist.remove(topHist.firstKey());
+ Map<Integer, byte[]> data0 = U.newHashMap(data.size());
- if (log.isDebugEnabled())
- log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size());
+ for (Map.Entry<Integer, Serializable> entry : data.entrySet()) {
+ try {
+ byte[] bytes = marsh.marshal(entry.getValue());
- return new TreeMap<>(topHist);
+ data0.put(entry.getKey(), bytes);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal discovery data " +
+ "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e);
+ }
}
- }
- /**
- * @param msg Error message.
- * @param e Exception.
- */
- private void onException(String msg, Exception e){
- getExceptionRegistry().onException(msg, e);
+ return data0;
}
/**
- * @param node Node.
- * @return {@link LinkedHashSet} of internal and external addresses of provided node.
- * Internal addresses placed before external addresses.
+ * @param joiningNodeID Joining node ID.
+ * @param nodeId Remote node ID for which data is provided.
+ * @param data Collection of marshalled discovery data objects from different components.
+ * @param clsLdr Class loader for discovery data unmarshalling.
*/
- @SuppressWarnings("TypeMayBeWeakened")
- private LinkedHashSet<InetSocketAddress> getNodeAddresses(TcpDiscoveryNode node) {
- LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(node.socketAddresses());
-
- Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
+ p
<TRUNCATED>