You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/01 13:44:56 UTC
[04/53] [abbrv] incubator-ignite git commit: # IGNITE-943 Merge
TcpDiscoverySpi and TcpClientDiscoverySpi
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/838c0fd8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
new file mode 100644
index 0000000..7e1f592
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -0,0 +1,4792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.events.*;
+import org.apache.ignite.internal.processors.security.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.security.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.net.*;
+import java.text.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
+import static org.apache.ignite.spi.IgnitePortProtocol.*;
+import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
+import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*;
+
+/**
+ *
+ */
+class ServerImpl extends TcpDiscoveryImpl {
+ /** */
+ private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+
+ /** Nodes ring. */
+ @GridToStringExclude
+ private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
+
+ /** Topology snapshots history. */
+ private final SortedMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+
+ /** Socket readers. */
+ private final Collection<SocketReader> readers = new LinkedList<>();
+
+ /** TCP server for discovery SPI. */
+ private TcpServer tcpSrvr;
+
+ /** Message worker. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private RingMessageWorker msgWorker;
+
+ /** Client message workers. */
+ private ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
+
+ /** Metrics sender. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private HeartbeatsSender hbsSnd;
+
+ /** Status checker. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private CheckStatusSender chkStatusSnd;
+
+ /** IP finder cleaner. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private IpFinderCleaner ipFinderCleaner;
+
+ /** Statistics printer thread. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private StatisticsPrinter statsPrinter;
+
+ /** Failed nodes (but still in topology). */
+ private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
+
+ /** Leaving nodes (but still in topology). */
+ private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
+
+ /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */
+ private boolean ipFinderHasLocAddr;
+
+ /** Addresses that do not respond during join requests send (for resolving concurrent start). */
+ private final Collection<SocketAddress> noResAddrs = new GridConcurrentHashSet<>();
+
+ /** Addresses that incoming join requests send were send from (for resolving concurrent start). */
+ private final Collection<SocketAddress> fromAddrs = new GridConcurrentHashSet<>();
+
+ /** Response on join request from coordinator (in case of duplicate ID or auth failure). */
+ private final GridTuple<TcpDiscoveryAbstractMessage> joinRes = F.t1();
+
+ /** Node authenticator. */
+ private DiscoverySpiNodeAuthenticator nodeAuth;
+
+ /** Mutex. */
+ private final Object mux = new Object();
+
+ /** Discovery state. */
+ protected TcpDiscoverySpiState spiState = DISCONNECTED;
+
+ /** Map with proceeding ping requests. */
+ private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
+ new ConcurrentHashMap8<>();
+
+ /** Debug mode. */
+ private boolean debugMode;
+
+ /** Debug messages history. */
+ private int debugMsgHist = 512;
+
+ /** Received messages. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private ConcurrentLinkedDeque<String> debugLog;
+
+ /**
+ * @param adapter Adapter.
+ */
+ ServerImpl(TcpDiscoverySpi adapter) {
+ super(adapter);
+ }
+
+ /**
+ * This method is intended for troubleshooting purposes only.
+ *
+ * @param debugMode {code True} to start SPI in debug mode.
+ */
+ public void setDebugMode(boolean debugMode) {
+ this.debugMode = debugMode;
+ }
+
+ /**
+ * This method is intended for troubleshooting purposes only.
+ *
+ * @param debugMsgHist Message history log size.
+ */
+ public void setDebugMessageHistory(int debugMsgHist) {
+ this.debugMsgHist = debugMsgHist;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSpiState() {
+ synchronized (mux) {
+ return spiState.name();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMessageWorkerQueueSize() {
+ return msgWorker.queueSize();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public UUID getCoordinator() {
+ TcpDiscoveryNode crd = resolveCoordinator();
+
+ return crd != null ? crd.id() : null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+ assert nodeId != null;
+
+ UUID locNodeId0 = getLocalNodeId();
+
+ if (locNodeId0 != null && locNodeId0.equals(nodeId))
+ // Return local node directly.
+ return locNode;
+
+ TcpDiscoveryNode node = ring.node(nodeId);
+
+ if (node != null && !node.visible())
+ return null;
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> getRemoteNodes() {
+ return F.upcast(ring.visibleRemoteNodes());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStart(String gridName) throws IgniteSpiException {
+ synchronized (mux) {
+ spiState = DISCONNECTED;
+ }
+
+ if (debugMode) {
+ if (!log.isInfoEnabled())
+ throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
+ "in debug mode.");
+
+ debugLog = new ConcurrentLinkedDeque<>();
+
+ U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode.");
+ }
+
+ // Clear addresses collections.
+ fromAddrs.clear();
+ noResAddrs.clear();
+
+ msgWorker = new RingMessageWorker();
+ msgWorker.start();
+
+ tcpSrvr = new TcpServer();
+
+ adapter.initLocalNode(tcpSrvr.port, true);
+
+ locNode = adapter.locNode;
+
+ // Start TCP server thread after local node is initialized.
+ tcpSrvr.start();
+
+ ring.localNode(locNode);
+
+ if (adapter.ipFinder.isShared())
+ registerLocalNodeAddress();
+ else {
+ if (F.isEmpty(adapter.ipFinder.getRegisteredAddresses()))
+ throw new IgniteSpiException("Non-shared IP finder must have IP addresses specified in " +
+ "GridTcpDiscoveryIpFinder.getRegisteredAddresses() configuration property " +
+ "(specify list of IP addresses in configuration).");
+
+ ipFinderHasLocAddr = adapter.ipFinderHasLocalAddress();
+ }
+
+ if (adapter.getStatisticsPrintFrequency() > 0 && log.isInfoEnabled()) {
+ statsPrinter = new StatisticsPrinter();
+ statsPrinter.start();
+ }
+
+ adapter.stats.onJoinStarted();
+
+ joinTopology();
+
+ adapter.stats.onJoinFinished();
+
+ hbsSnd = new HeartbeatsSender();
+ hbsSnd.start();
+
+ chkStatusSnd = new CheckStatusSender();
+ chkStatusSnd.start();
+
+ if (adapter.ipFinder.isShared()) {
+ ipFinderCleaner = new IpFinderCleaner();
+ ipFinderCleaner.start();
+ }
+
+ adapter.printStartInfo();
+ }
+
+ /**
+ * @throws IgniteSpiException If failed.
+ */
+ @SuppressWarnings("BusyWait")
+ private void registerLocalNodeAddress() throws IgniteSpiException {
+ // Make sure address registration succeeded.
+ while (true) {
+ try {
+ adapter.ipFinder.initializeLocalAddresses(locNode.socketAddresses());
+
+ // Success.
+ break;
+ }
+ catch (IllegalStateException e) {
+ throw new IgniteSpiException("Failed to register local node address with IP finder: " +
+ locNode.socketAddresses(), e);
+ }
+ catch (IgniteSpiException e) {
+ LT.error(log, e, "Failed to register local node address in IP finder on start " +
+ "(retrying every 2000 ms).");
+ }
+
+ try {
+ U.sleep(2000);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteSpiException("Thread has been interrupted.", e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
+ spiCtx.registerPort(tcpSrvr.port, TCP);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStop() throws IgniteSpiException {
+ spiStop0(false);
+ }
+
+ /**
+ * Stops SPI finally or stops SPI for restart.
+ *
+ * @param disconnect {@code True} if SPI is being disconnected.
+ * @throws IgniteSpiException If failed.
+ */
+ private void spiStop0(boolean disconnect) throws IgniteSpiException {
+ if (log.isDebugEnabled()) {
+ if (disconnect)
+ log.debug("Disconnecting SPI.");
+ else
+ log.debug("Preparing to start local node stop procedure.");
+ }
+
+ if (disconnect) {
+ synchronized (mux) {
+ spiState = DISCONNECTING;
+ }
+ }
+
+ if (msgWorker != null && msgWorker.isAlive() && !disconnect) {
+ // Send node left message only if it is final stop.
+ msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
+
+ synchronized (mux) {
+ long threshold = U.currentTimeMillis() + adapter.netTimeout;
+
+ long timeout = adapter.netTimeout;
+
+ while (spiState != LEFT && timeout > 0) {
+ try {
+ mux.wait(timeout);
+
+ timeout = threshold - U.currentTimeMillis();
+ }
+ catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+
+ break;
+ }
+ }
+
+ if (spiState == LEFT) {
+ if (log.isDebugEnabled())
+ log.debug("Verification for local node leave has been received from coordinator" +
+ " (continuing stop procedure).");
+ }
+ else if (log.isInfoEnabled()) {
+ log.info("No verification for local node leave has been received from coordinator" +
+ " (will stop node anyway).");
+ }
+ }
+ }
+
+ U.interrupt(tcpSrvr);
+ U.join(tcpSrvr, log);
+
+ Collection<SocketReader> tmp;
+
+ synchronized (mux) {
+ tmp = U.arrayList(readers);
+ }
+
+ U.interrupt(tmp);
+ U.joinThreads(tmp, log);
+
+ U.interrupt(hbsSnd);
+ U.join(hbsSnd, log);
+
+ U.interrupt(chkStatusSnd);
+ U.join(chkStatusSnd, log);
+
+ U.interrupt(ipFinderCleaner);
+ U.join(ipFinderCleaner, log);
+
+ U.interrupt(msgWorker);
+ U.join(msgWorker, log);
+
+ U.interrupt(statsPrinter);
+ U.join(statsPrinter, log);
+
+ Collection<TcpDiscoveryNode> rmts = null;
+
+ if (!disconnect)
+ adapter.printStopInfo();
+ else {
+ adapter.getSpiContext().deregisterPorts();
+
+ rmts = ring.visibleRemoteNodes();
+ }
+
+ long topVer = ring.topologyVersion();
+
+ ring.clear();
+
+ if (rmts != null && !rmts.isEmpty()) {
+ // This is restart/disconnection and remote nodes are not empty.
+ // We need to fire FAIL event for each.
+ DiscoverySpiListener lsnr = adapter.lsnr;
+
+ if (lsnr != null) {
+ Set<ClusterNode> processed = new HashSet<>();
+
+ for (TcpDiscoveryNode n : rmts) {
+ assert n.visible();
+
+ processed.add(n);
+
+ List<ClusterNode> top = U.arrayList(rmts, F.notIn(processed));
+
+ topVer++;
+
+ Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer,
+ Collections.unmodifiableList(top));
+
+ lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, hist, null);
+ }
+ }
+ }
+
+ printStatistics();
+
+ adapter.stats.clear();
+
+ synchronized (mux) {
+ // Clear stored data.
+ leavingNodes.clear();
+ failedNodes.clear();
+
+ spiState = DISCONNECTED;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean pingNode(UUID nodeId) {
+ assert nodeId != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Pinging node: " + nodeId + "].");
+
+ if (nodeId == getLocalNodeId())
+ return true;
+
+ TcpDiscoveryNode node = ring.node(nodeId);
+
+ if (node == null || !node.visible())
+ return false;
+
+ boolean res = pingNode(node);
+
+ if (!res && !node.isClient()) {
+ LT.warn(log, null, "Failed to ping node (status check will be initiated): " + nodeId);
+
+ msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, node.id()));
+ }
+
+ return res;
+ }
+
+ /**
+ * Pings the remote node to see if it's alive.
+ *
+ * @param node Node.
+ * @return {@code True} if ping succeeds.
+ */
+ private boolean pingNode(TcpDiscoveryNode node) {
+ assert node != null;
+
+ if (node.id().equals(getLocalNodeId()))
+ return true;
+
+ UUID clientNodeId = null;
+
+ if (node.isClient()) {
+ clientNodeId = node.id();
+
+ node = ring.node(node.clientRouterNodeId());
+
+ if (node == null || !node.visible())
+ return false;
+ }
+
+ for (InetSocketAddress addr : adapter.getNodeAddresses(node, U.sameMacs(locNode, node))) {
+ try {
+ // ID returned by the node should be the same as ID of the parameter for ping to succeed.
+ IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
+
+ return node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']');
+
+ onException("Failed to ping node [node=" + node + ", err=" + e.getMessage() + ']', e);
+ // continue;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Pings the node by its address to see if it's alive.
+ *
+ * @param addr Address of the node.
+ * @return ID of the remote node and "client exists" flag if node alive.
+ * @throws IgniteSpiException If an error occurs.
+ */
+ private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId)
+ throws IgniteCheckedException {
+ assert addr != null;
+
+ UUID locNodeId = getLocalNodeId();
+
+ if (F.contains(adapter.locNodeAddrs, addr)) {
+ if (clientNodeId == null)
+ return F.t(getLocalNodeId(), false);
+
+ ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId);
+
+ if (clientWorker == null)
+ return F.t(getLocalNodeId(), false);
+
+ boolean clientPingRes;
+
+ try {
+ clientPingRes = clientWorker.ping();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
+ }
+
+ return F.t(getLocalNodeId(), clientPingRes);
+ }
+
+ GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>();
+
+ IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
+
+ if (oldFut != null)
+ return oldFut.get();
+ else {
+ Collection<Throwable> errs = null;
+
+ try {
+ Socket sock = null;
+
+ for (int i = 0; i < adapter.reconCnt; i++) {
+ try {
+ if (addr.isUnresolved())
+ addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
+
+ long tstamp = U.currentTimeMillis();
+
+ sock = adapter.openSocket(addr);
+
+ adapter.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+
+ TcpDiscoveryPingResponse res = adapter.readMessage(sock, null, adapter.netTimeout);
+
+ if (locNodeId.equals(res.creatorNodeId())) {
+ if (log.isDebugEnabled())
+ log.debug("Ping response from local node: " + res);
+
+ break;
+ }
+
+ adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+
+ IgniteBiTuple<UUID, Boolean> t = F.t(res.creatorNodeId(), res.clientExists());
+
+ fut.onDone(t);
+
+ return t;
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (errs == null)
+ errs = new ArrayList<>();
+
+ errs.add(e);
+ }
+ finally {
+ U.closeQuiet(sock);
+ }
+ }
+ }
+ catch (Throwable t) {
+ fut.onDone(t);
+
+ if (t instanceof Error)
+ throw t;
+
+ throw U.cast(t);
+ }
+ finally {
+ if (!fut.isDone())
+ fut.onDone(U.exceptionWithSuppressed("Failed to ping node by address: " + addr, errs));
+
+ boolean b = pingMap.remove(addr, fut);
+
+ assert b;
+ }
+
+ return fut.get();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void disconnect() throws IgniteSpiException {
+ spiStop0(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator nodeAuth) {
+ this.nodeAuth = nodeAuth;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
+ try {
+ msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, adapter.marsh.marshal(evt)));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void failNode(UUID nodeId) {
+ ClusterNode node = ring.node(nodeId);
+
+ if (node != null) {
+ TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
+ node.id(), node.order());
+
+ msgWorker.addMessage(msg);
+ }
+ }
+
+ /**
+ * Tries to join this node to topology.
+ *
+ * @throws IgniteSpiException If any error occurs.
+ */
+ private void joinTopology() throws IgniteSpiException {
+ synchronized (mux) {
+ assert spiState == CONNECTING || spiState == DISCONNECTED;
+
+ spiState = CONNECTING;
+ }
+
+ SecurityCredentials locCred = (SecurityCredentials)locNode.getAttributes()
+ .get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+ // Marshal credentials for backward compatibility and security.
+ marshalCredentials(locNode);
+
+ while (true) {
+ if (!sendJoinRequestMessage()) {
+ if (log.isDebugEnabled())
+ log.debug("Join request message has not been sent (local node is the first in the topology).");
+
+ if (nodeAuth != null) {
+ // Authenticate local node.
+ try {
+ SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred);
+
+ if (subj == null)
+ throw new IgniteSpiException("Authentication failed for local node: " + locNode.id());
+
+ Map<String, Object> attrs = new HashMap<>(locNode.attributes());
+
+ attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT,
+ adapter.ignite().configuration().getMarshaller().marshal(subj));
+ attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+ locNode.setAttributes(attrs);
+ }
+ catch (IgniteException | IgniteCheckedException e) {
+ throw new IgniteSpiException("Failed to authenticate local node (will shutdown local node).", e);
+ }
+ }
+
+ locNode.order(1);
+ locNode.internalOrder(1);
+
+ adapter.gridStartTime = U.currentTimeMillis();
+
+ locNode.visible(true);
+
+ ring.clear();
+
+ ring.topologyVersion(1);
+
+ synchronized (mux) {
+ topHist.clear();
+
+ spiState = CONNECTED;
+
+ mux.notifyAll();
+ }
+
+ notifyDiscovery(EVT_NODE_JOINED, 1, locNode);
+
+ break;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Join request message has been sent (waiting for coordinator response).");
+
+ synchronized (mux) {
+ long threshold = U.currentTimeMillis() + adapter.netTimeout;
+
+ long timeout = adapter.netTimeout;
+
+ while (spiState == CONNECTING && timeout > 0) {
+ try {
+ mux.wait(timeout);
+
+ timeout = threshold - U.currentTimeMillis();
+ }
+ catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteSpiException("Thread has been interrupted.");
+ }
+ }
+
+ if (spiState == CONNECTED)
+ break;
+ else if (spiState == DUPLICATE_ID)
+ throw adapter.duplicateIdError((TcpDiscoveryDuplicateIdMessage)joinRes.get());
+ else if (spiState == AUTH_FAILED)
+ throw adapter.authenticationFailedError((TcpDiscoveryAuthFailedMessage)joinRes.get());
+ else if (spiState == CHECK_FAILED)
+ throw adapter.checkFailedError((TcpDiscoveryCheckFailedMessage)joinRes.get());
+ else if (spiState == LOOPBACK_PROBLEM) {
+ TcpDiscoveryLoopbackProblemMessage msg = (TcpDiscoveryLoopbackProblemMessage)joinRes.get();
+
+ boolean locHostLoopback = adapter.locHost.isLoopbackAddress();
+
+ String firstNode = locHostLoopback ? "local" : "remote";
+
+ String secondNode = locHostLoopback ? "remote" : "local";
+
+ throw new IgniteSpiException("Failed to add node to topology because " + firstNode +
+ " node is configured to use loopback address, but " + secondNode + " node is not " +
+ "(consider changing 'localAddress' configuration parameter) " +
+ "[locNodeAddrs=" + U.addressesAsString(locNode) + ", rmtNodeAddrs=" +
+ U.addressesAsString(msg.addresses(), msg.hostNames()) + ']');
+ }
+ else
+ LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " +
+ "Check remote nodes logs for possible error messages. " +
+ "Note that large topology may require significant time to start. " +
+ "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " +
+ "if getting this message on the starting nodes [networkTimeout=" + adapter.netTimeout + ']');
+ }
+ }
+
+ assert locNode.order() != 0;
+ assert locNode.internalOrder() != 0;
+
+ if (log.isDebugEnabled())
+ log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder());
+ }
+
+ /**
+ * Tries to send join request message to a random node presenting in topology.
+ * Address is provided by {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is
+ * sent to first node connection succeeded to.
+ *
+ * @return {@code true} if send succeeded.
+ * @throws IgniteSpiException If any error occurs.
+ */
+ @SuppressWarnings({"BusyWait"})
+ private boolean sendJoinRequestMessage() throws IgniteSpiException {
+ TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
+ adapter.collectExchangeData(getLocalNodeId()));
+
+ // Time when it has been detected, that addresses from IP finder do not respond.
+ long noResStart = 0;
+
+ while (true) {
+ Collection<InetSocketAddress> addrs = adapter.resolvedAddresses();
+
+ if (F.isEmpty(addrs))
+ return false;
+
+ boolean retry = false;
+ Collection<Exception> errs = new ArrayList<>();
+
+ try (SocketMultiConnector multiConnector = new SocketMultiConnector(adapter, addrs, 2)) {
+ GridTuple3<InetSocketAddress, Socket, Exception> tuple;
+
+ while ((tuple = multiConnector.next()) != null) {
+ InetSocketAddress addr = tuple.get1();
+ Socket sock = tuple.get2();
+ Exception ex = tuple.get3();
+
+ if (ex == null) {
+ assert sock != null;
+
+ try {
+ Integer res = sendMessageDirectly(joinReq, addr, sock);
+
+ assert res != null;
+
+ noResAddrs.remove(addr);
+
+ // Address is responsive, reset period start.
+ noResStart = 0;
+
+ switch (res) {
+ case RES_WAIT:
+ // Concurrent startup, try sending join request again or wait if no success.
+ retry = true;
+
+ break;
+ case RES_OK:
+ if (log.isDebugEnabled())
+ log.debug("Join request message has been sent to address [addr=" + addr +
+ ", req=" + joinReq + ']');
+
+ // Join request sending succeeded, wait for response from topology.
+ return true;
+
+ default:
+ // Concurrent startup, try next node.
+ if (res == RES_CONTINUE_JOIN) {
+ if (!fromAddrs.contains(addr))
+ retry = true;
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Unexpected response to join request: " + res);
+
+ retry = true;
+ }
+
+ break;
+ }
+ }
+ catch (IgniteSpiException e) {
+ e.printStackTrace();
+
+ ex = e;
+ }
+ }
+
+ if (ex != null) {
+ errs.add(ex);
+
+ if (log.isDebugEnabled()) {
+ IOException ioe = X.cause(ex, IOException.class);
+
+ log.debug("Failed to send join request message [addr=" + addr +
+ ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']');
+
+ onException("Failed to send join request message [addr=" + addr +
+ ", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']', ioe);
+ }
+
+ noResAddrs.add(addr);
+ }
+ }
+ }
+
+ if (retry) {
+ if (log.isDebugEnabled())
+ log.debug("Concurrent discovery SPI start has been detected (local node should wait).");
+
+ try {
+ U.sleep(2000);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteSpiException("Thread has been interrupted.", e);
+ }
+ }
+ else if (!adapter.ipFinder.isShared() && !ipFinderHasLocAddr) {
+ IgniteCheckedException e = null;
+
+ if (!errs.isEmpty()) {
+ e = new IgniteCheckedException("Multiple connection attempts failed.");
+
+ for (Exception err : errs)
+ e.addSuppressed(err);
+ }
+
+ if (e != null && X.hasCause(e, ConnectException.class))
+ LT.warn(log, null, "Failed to connect to any address from IP finder " +
+ "(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
+ addrs);
+
+ if (adapter.joinTimeout > 0) {
+ if (noResStart == 0)
+ noResStart = U.currentTimeMillis();
+ else if (U.currentTimeMillis() - noResStart > adapter.joinTimeout)
+ throw new IgniteSpiException(
+ "Failed to connect to any address from IP finder within join timeout " +
+ "(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
+ "on all host machines, or consider increasing 'joinTimeout' configuration property): " +
+ addrs, e);
+ }
+
+ try {
+ U.sleep(2000);
+ }
+ catch (IgniteInterruptedCheckedException ex) {
+ throw new IgniteSpiException("Thread has been interrupted.", ex);
+ }
+ }
+ else
+ break;
+ }
+
+ return false;
+ }
+
+ /**
+ * Establishes connection to an address, sends message and returns the response (if any).
+ *
+ * @param msg Message to send.
+ * @param addr Address to send message to.
+ * @return Response read from the recipient or {@code null} if no response is supposed.
+ * @throws IgniteSpiException If an error occurs.
+ */
+ @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, Socket sock)
+ throws IgniteSpiException {
+ assert msg != null;
+ assert addr != null;
+
+ Collection<Throwable> errs = null;
+
+ long ackTimeout0 = adapter.ackTimeout;
+
+ int connectAttempts = 1;
+
+ boolean joinReqSent = false;
+
+ UUID locNodeId = getLocalNodeId();
+
+ for (int i = 0; i < adapter.reconCnt; i++) {
+ // Need to set to false on each new iteration,
+ // since remote node may leave in the middle of the first iteration.
+ joinReqSent = false;
+
+ boolean openSock = false;
+
+ try {
+ long tstamp = U.currentTimeMillis();
+
+ if (sock == null)
+ sock = adapter.openSocket(addr);
+
+ openSock = true;
+
+ // Handshake.
+ adapter.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+
+ TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, ackTimeout0);
+
+ if (locNodeId.equals(res.creatorNodeId())) {
+ if (log.isDebugEnabled())
+ log.debug("Handshake response from local node: " + res);
+
+ break;
+ }
+
+ adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+
+ // Send message.
+ tstamp = U.currentTimeMillis();
+
+ adapter.writeToSocket(sock, msg);
+
+ adapter.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+
+ if (debugMode)
+ debugLog("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
+ ", rmtNodeId=" + res.creatorNodeId() + ']');
+
+ if (log.isDebugEnabled())
+ log.debug("Message has been sent directly to address [msg=" + msg + ", addr=" + addr +
+ ", rmtNodeId=" + res.creatorNodeId() + ']');
+
+ // Connection has been established, but
+ // join request may not be unmarshalled on remote host.
+ // E.g. due to class not found issue.
+ joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
+
+ return adapter.readReceipt(sock, ackTimeout0);
+ }
+ catch (ClassCastException e) {
+ // This issue is rarely reproducible on AmazonEC2, but never
+ // on dedicated machines.
+ if (log.isDebugEnabled())
+ U.error(log, "Class cast exception on direct send: " + addr, e);
+
+ onException("Class cast exception on direct send: " + addr, e);
+
+ if (errs == null)
+ errs = new ArrayList<>();
+
+ errs.add(e);
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.error("Exception on direct send: " + e.getMessage(), e);
+
+ onException("Exception on direct send: " + e.getMessage(), e);
+
+ if (errs == null)
+ errs = new ArrayList<>();
+
+ errs.add(e);
+
+ if (!openSock) {
+ // Reconnect for the second time, if connection is not established.
+ if (connectAttempts < 2) {
+ connectAttempts++;
+
+ continue;
+ }
+
+ break; // Don't retry if we can not establish connection.
+ }
+
+ if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+ ackTimeout0 *= 2;
+
+ if (!checkAckTimeout(ackTimeout0))
+ break;
+ }
+ }
+ finally {
+ U.closeQuiet(sock);
+
+ sock = null;
+ }
+ }
+
+ if (joinReqSent) {
+ if (log.isDebugEnabled())
+ log.debug("Join request has been sent, but receipt has not been read (returning RES_WAIT).");
+
+ // Topology will not include this node,
+ // however, warning on timed out join will be output.
+ return RES_OK;
+ }
+
+ throw new IgniteSpiException(
+ "Failed to send message to address [addr=" + addr + ", msg=" + msg + ']',
+ U.exceptionWithSuppressed("Failed to send message to address " +
+ "[addr=" + addr + ", msg=" + msg + ']', errs));
+ }
+
+ /**
+ * Marshalls credentials with discovery SPI marshaller (will replace attribute value).
+ *
+ * @param node Node to marshall credentials for.
+ * @throws IgniteSpiException If marshalling failed.
+ */
+ private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
+ try {
+ // Use security-unsafe getter.
+ Map<String, Object> attrs = new HashMap<>(node.getAttributes());
+
+ attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
+ adapter.marsh.marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+
+ node.setAttributes(attrs);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException("Failed to marshal node security credentials: " + node.id(), e);
+ }
+ }
+
+ /**
+ * Unmarshalls credentials with discovery SPI marshaller (will not replace attribute value).
+ *
+ * @param node Node to unmarshall credentials for.
+ * @return Security credentials.
+ * @throws IgniteSpiException If unmarshal fails.
+ */
+ private SecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
+ try {
+ byte[] credBytes = (byte[])node.getAttributes().get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
+
+ if (credBytes == null)
+ return null;
+
+ return adapter.marsh.unmarshal(credBytes, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
+ }
+ }
+
+ /**
+ * @param ackTimeout Acknowledgement timeout.
+ * @return {@code True} if acknowledgement timeout is less or equal to
+ * maximum acknowledgement timeout, {@code false} otherwise.
+ */
+ private boolean checkAckTimeout(long ackTimeout) {
+ if (ackTimeout > adapter.maxAckTimeout) {
+ LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
+ "(consider increasing 'maxAckTimeout' configuration property) " +
+ "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + adapter.maxAckTimeout + ']');
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Notify external listener on discovery event.
+ *
+ * @param type Discovery event type. See {@link org.apache.ignite.events.DiscoveryEvent} for more details.
+ * @param topVer Topology version.
+ * @param node Remote node this event is connected with.
+ */
+ private void notifyDiscovery(int type, long topVer, TcpDiscoveryNode node) {
+ assert type > 0;
+ assert node != null;
+
+ DiscoverySpiListener lsnr = adapter.lsnr;
+
+ TcpDiscoverySpiState spiState = spiStateCopy();
+
+ if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) {
+ if (log.isDebugEnabled())
+ log.debug("Discovery notification [node=" + node + ", spiState=" + spiState +
+ ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
+
+ Collection<ClusterNode> top = F.upcast(ring.visibleNodes());
+
+ Map<Long, Collection<ClusterNode>> hist = updateTopologyHistory(topVer, top);
+
+ lsnr.onDiscovery(type, topVer, node, top, hist, null);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState +
+ ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']');
+ }
+
+ /**
+ * Update topology history with new topology snapshots.
+ *
+ * @param topVer Topology version.
+ * @param top Topology snapshot.
+ * @return Copy of updated topology history.
+ */
+ @Nullable private Map<Long, Collection<ClusterNode>> updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
+ synchronized (mux) {
+ if (topHist.containsKey(topVer))
+ return null;
+
+ topHist.put(topVer, top);
+
+ while (topHist.size() > adapter.topHistSize)
+ topHist.remove(topHist.firstKey());
+
+ if (log.isDebugEnabled())
+ log.debug("Added topology snapshot to history, topVer=" + topVer + ", historySize=" + topHist.size());
+
+ return new TreeMap<>(topHist);
+ }
+ }
+
+ /**
+ * Checks whether local node is coordinator. Nodes that are leaving or failed
+ * (but are still in topology) are removed from search.
+ *
+ * @return {@code true} if local node is coordinator.
+ */
+ private boolean isLocalNodeCoordinator() {
+ synchronized (mux) {
+ boolean crd = spiState == CONNECTED && locNode.equals(resolveCoordinator());
+
+ if (crd)
+ adapter.stats.onBecomingCoordinator();
+
+ return crd;
+ }
+ }
+
+ /**
+ * @return Spi state copy.
+ */
+ private TcpDiscoverySpiState spiStateCopy() {
+ TcpDiscoverySpiState state;
+
+ synchronized (mux) {
+ state = spiState;
+ }
+
+ return state;
+ }
+
+ /**
+ * Resolves coordinator. Nodes that are leaving or failed (but are still in
+ * topology) are removed from search.
+ *
+ * @return Coordinator node or {@code null} if there are no coordinator
+ * (i.e. local node is the last one and is currently stopping).
+ */
+ @Nullable private TcpDiscoveryNode resolveCoordinator() {
+ return resolveCoordinator(null);
+ }
+
+ /**
+ * Resolves coordinator. Nodes that are leaving or failed (but are still in
+ * topology) are removed from search as well as provided filter.
+ *
+ * @param filter Nodes to exclude when resolving coordinator (optional).
+ * @return Coordinator node or {@code null} if there are no coordinator
+ * (i.e. local node is the last one and is currently stopping).
+ */
+ @Nullable private TcpDiscoveryNode resolveCoordinator(
+ @Nullable Collection<TcpDiscoveryNode> filter) {
+ synchronized (mux) {
+ Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes);
+
+ if (!F.isEmpty(filter))
+ excluded = F.concat(false, excluded, filter);
+
+ return ring.coordinator(excluded);
+ }
+ }
+
+ /**
+ * Prints SPI statistics.
+ */
+ private void printStatistics() {
+ if (log.isInfoEnabled() && adapter.statsPrintFreq > 0) {
+ int failedNodesSize;
+ int leavingNodesSize;
+
+ synchronized (mux) {
+ failedNodesSize = failedNodes.size();
+ leavingNodesSize = leavingNodes.size();
+ }
+
+ Runtime runtime = Runtime.getRuntime();
+
+ TcpDiscoveryNode coord = resolveCoordinator();
+
+ log.info("Discovery SPI statistics [statistics=" + adapter.stats + ", spiState=" + spiStateCopy() +
+ ", coord=" + coord +
+ ", topSize=" + ring.allNodes().size() +
+ ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize +
+ ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") +
+ ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) : "N/A") +
+ ", heapFree=" + runtime.freeMemory() / (1024 * 1024) +
+ "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]");
+ }
+ }
+
+ /**
+ * @param msg Message to prepare.
+ * @param destNodeId Destination node ID.
+ * @param msgs Messages to include.
+ * @param discardMsgId Discarded message ID.
+ */
+ private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId,
+ @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+ assert destNodeId != null;
+
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+ TcpDiscoveryNode node = nodeAddedMsg.node();
+
+ if (node.id().equals(destNodeId)) {
+ Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
+ Collection<TcpDiscoveryNode> topToSend = new ArrayList<>(allNodes.size());
+
+ for (TcpDiscoveryNode n0 : allNodes) {
+ assert n0.internalOrder() != 0 : n0;
+
+ // Skip next node and nodes added after next
+ // in case this message is resent due to failures/leaves.
+ // There will be separate messages for nodes with greater
+ // internal order.
+ if (n0.internalOrder() < nodeAddedMsg.node().internalOrder())
+ topToSend.add(n0);
+ }
+
+ nodeAddedMsg.topology(topToSend);
+ nodeAddedMsg.messages(msgs, discardMsgId);
+
+ Map<Long, Collection<ClusterNode>> hist;
+
+ synchronized (mux) {
+ hist = new TreeMap<>(topHist);
+ }
+
+ nodeAddedMsg.topologyHistory(hist);
+ }
+ }
+ }
+
+ /**
+ * @param msg Message to clear.
+ */
+ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ // Nullify topology before registration.
+ TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+ nodeAddedMsg.topology(null);
+ nodeAddedMsg.topologyHistory(null);
+ nodeAddedMsg.messages(null, null);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override void simulateNodeFailure() {
+ U.warn(log, "Simulating node failure: " + getLocalNodeId());
+
+ U.interrupt(tcpSrvr);
+ U.join(tcpSrvr, log);
+
+ U.interrupt(hbsSnd);
+ U.join(hbsSnd, log);
+
+ U.interrupt(chkStatusSnd);
+ U.join(chkStatusSnd, log);
+
+ U.interrupt(ipFinderCleaner);
+ U.join(ipFinderCleaner, log);
+
+ Collection<SocketReader> tmp;
+
+ synchronized (mux) {
+ tmp = U.arrayList(readers);
+ }
+
+ U.interrupt(tmp);
+ U.joinThreads(tmp, log);
+
+ U.interrupt(msgWorker);
+ U.join(msgWorker, log);
+
+ U.interrupt(statsPrinter);
+ U.join(statsPrinter, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void brakeConnection() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteSpiThread workerThread() {
+ return msgWorker;
+ }
+
+ /**
+ * <strong>FOR TEST ONLY!!!</strong>
+ * <p>
+ * Simulates situation when next node is still alive but is bypassed
+ * since it has been excluded from the ring, possibly, due to short time
+ * network problems.
+ * <p>
+ * This method is intended for test purposes only.
+ */
+ void forceNextNodeFailure() {
+ U.warn(log, "Next node will be forcibly failed (if any).");
+
+ TcpDiscoveryNode next;
+
+ synchronized (mux) {
+ next = ring.nextNode(failedNodes);
+ }
+
+ if (next != null)
+ msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), next.id(),
+ next.internalOrder()));
+ }
+
+ /**
+ * <strong>FOR TEST ONLY!!!</strong>
+ * <p>
+ * This method is intended for test purposes only.
+ *
+ * @return Nodes ring.
+ */
+ TcpDiscoveryNodesRing ring() {
+ return ring;
+ }
+
+ /** {@inheritDoc} */
+ public void dumpDebugInfo(IgniteLogger log) {
+ if (!debugMode) {
+ U.quietAndWarn(log, "Failed to dump debug info (discovery SPI was not configured " +
+ "in debug mode, consider setting 'debugMode' configuration property to 'true').");
+
+ return;
+ }
+
+ assert log.isInfoEnabled();
+
+ synchronized (mux) {
+ StringBuilder b = new StringBuilder(U.nl());
+
+ b.append(">>>").append(U.nl());
+ b.append(">>>").append("Dumping discovery SPI debug info.").append(U.nl());
+ b.append(">>>").append(U.nl());
+
+ b.append("Local node ID: ").append(getLocalNodeId()).append(U.nl()).append(U.nl());
+ b.append("Local node: ").append(locNode).append(U.nl()).append(U.nl());
+ b.append("SPI state: ").append(spiState).append(U.nl()).append(U.nl());
+
+ b.append("Internal threads: ").append(U.nl());
+
+ b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
+ b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
+ b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
+ b.append(" Socket timeout worker: ").append(threadStatus(adapter.sockTimeoutWorker)).append(U.nl());
+ b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
+ b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
+
+ b.append(U.nl());
+
+ b.append("Socket readers: ").append(U.nl());
+
+ for (SocketReader rdr : readers)
+ b.append(" ").append(rdr).append(U.nl());
+
+ b.append(U.nl());
+
+ b.append("In-memory log messages: ").append(U.nl());
+
+ for (String msg : debugLog)
+ b.append(" ").append(msg).append(U.nl());
+
+ b.append(U.nl());
+
+ b.append("Leaving nodes: ").append(U.nl());
+
+ for (TcpDiscoveryNode node : leavingNodes)
+ b.append(" ").append(node.id()).append(U.nl());
+
+ b.append(U.nl());
+
+ b.append("Failed nodes: ").append(U.nl());
+
+ for (TcpDiscoveryNode node : failedNodes)
+ b.append(" ").append(node.id()).append(U.nl());
+
+ b.append(U.nl());
+
+ b.append("Stats: ").append(adapter.stats).append(U.nl());
+
+ U.quietAndInfo(log, b.toString());
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void debugLog(String msg) {
+ assert debugMode;
+
+ String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
+ '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
+ "-" + locNode.internalOrder() + "] " +
+ msg;
+
+ debugLog.add(msg0);
+
+ int delta = debugLog.size() - debugMsgHist;
+
+ for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
+ debugLog.poll();
+ }
+
+ /**
+ * @param msg Message.
+ * @return {@code True} if recordable in debug mode.
+ */
+ private boolean recordable(TcpDiscoveryAbstractMessage msg) {
+ return !(msg instanceof TcpDiscoveryHeartbeatMessage) &&
+ !(msg instanceof TcpDiscoveryStatusCheckMessage) &&
+ !(msg instanceof TcpDiscoveryDiscardMessage);
+ }
+
+ /**
+ * Checks if two given {@link SecurityPermissionSet} objects contain the same permissions.
+ * Each permission belongs to one of three groups : cache, task or system.
+ *
+ * @param locPerms The first set of permissions.
+ * @param rmtPerms The second set of permissions.
+ * @return {@code True} if given parameters contain the same permissions, {@code False} otherwise.
+ */
+ private boolean permissionsEqual(SecurityPermissionSet locPerms, SecurityPermissionSet rmtPerms) {
+ boolean dfltAllowMatch = !(locPerms.defaultAllowAll() ^ rmtPerms.defaultAllowAll());
+
+ boolean bothHaveSamePerms = F.eqNotOrdered(rmtPerms.systemPermissions(), locPerms.systemPermissions()) &&
+ F.eqNotOrdered(rmtPerms.cachePermissions(), locPerms.cachePermissions()) &&
+ F.eqNotOrdered(rmtPerms.taskPermissions(), locPerms.taskPermissions());
+
+ return dfltAllowMatch && bothHaveSamePerms;
+ }
+
+ /**
+ * @param msg Message.
+ * @param nodeId Node ID.
+ */
+ private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
+ msg.removeMetrics(nodeId);
+ msg.removeCacheMetrics(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ServerImpl.class, this);
+ }
+
+ /**
+ * Thread that sends heartbeats.
+ */
+ private class HeartbeatsSender extends IgniteSpiThread {
+ /**
+ * Constructor.
+ */
+ private HeartbeatsSender() {
+ super(adapter.ignite().name(), "tcp-disco-hb-sender", log);
+
+ setPriority(adapter.threadPri);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("BusyWait")
+ @Override protected void body() throws InterruptedException {
+ while (!isLocalNodeCoordinator())
+ Thread.sleep(1000);
+
+ if (log.isDebugEnabled())
+ log.debug("Heartbeats sender has been started.");
+
+ while (!isInterrupted()) {
+ if (spiStateCopy() != CONNECTED) {
+ if (log.isDebugEnabled())
+ log.debug("Stopping heartbeats sender (SPI is not connected to topology).");
+
+ return;
+ }
+
+ TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId());
+
+ msg.verify(getLocalNodeId());
+
+ msgWorker.addMessage(msg);
+
+ Thread.sleep(adapter.hbFreq);
+ }
+ }
+ }
+
+ /**
+ * Thread that sends status check messages to next node if local node has not
+ * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
+ * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
+ * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
+ */
+ private class CheckStatusSender extends IgniteSpiThread {
+ /**
+ * Constructor.
+ */
+ private CheckStatusSender() {
+ super(adapter.ignite().name(), "tcp-disco-status-check-sender", log);
+
+ setPriority(adapter.threadPri);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("BusyWait")
+ @Override protected void body() throws InterruptedException {
+ if (log.isDebugEnabled())
+ log.debug("Status check sender has been started.");
+
+ // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
+ long checkTimeout = (long)adapter.maxMissedHbs * adapter.hbFreq + 50;
+
+ long lastSent = 0;
+
+ while (!isInterrupted()) {
+ // 1. Determine timeout.
+ if (lastSent < locNode.lastUpdateTime())
+ lastSent = locNode.lastUpdateTime();
+
+ long timeout = (lastSent + checkTimeout) - U.currentTimeMillis();
+
+ if (timeout > 0)
+ Thread.sleep(timeout);
+
+ // 2. Check if SPI is still connected.
+ if (spiStateCopy() != CONNECTED) {
+ if (log.isDebugEnabled())
+ log.debug("Stopping status check sender (SPI is not connected to topology).");
+
+ return;
+ }
+
+ // 3. Was there an update?
+ if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping status check send " +
+ "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) +
+ ", hasRmts=" + ring.hasRemoteNodes() + ']');
+
+ continue;
+ }
+
+ // 4. Send status check message.
+ lastSent = U.currentTimeMillis();
+
+ msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+ }
+ }
+ }
+
+ /**
+ * Thread that cleans IP finder and keeps it in the correct state, unregistering
+ * addresses of the nodes that has left the topology.
+ * <p>
+ * This thread should run only on coordinator node and will clean IP finder
+ * if and only if {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()} is {@code true}.
+ */
+ private class IpFinderCleaner extends IgniteSpiThread {
+ /**
+ * Constructor.
+ */
+ private IpFinderCleaner() {
+ super(adapter.ignite().name(), "tcp-disco-ip-finder-cleaner", log);
+
+ setPriority(adapter.threadPri);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("BusyWait")
+ @Override protected void body() throws InterruptedException {
+ if (log.isDebugEnabled())
+ log.debug("IP finder cleaner has been started.");
+
+ while (!isInterrupted()) {
+ Thread.sleep(adapter.ipFinderCleanFreq);
+
+ if (!isLocalNodeCoordinator())
+ continue;
+
+ if (spiStateCopy() != CONNECTED) {
+ if (log.isDebugEnabled())
+ log.debug("Stopping IP finder cleaner (SPI is not connected to topology).");
+
+ return;
+ }
+
+ if (adapter.ipFinder.isShared())
+ cleanIpFinder();
+ }
+ }
+
+ /**
+ * Cleans IP finder.
+ */
+ private void cleanIpFinder() {
+ assert adapter.ipFinder.isShared();
+
+ try {
+ // Addresses that belongs to nodes in topology.
+ Collection<InetSocketAddress> currAddrs = F.flatCollections(
+ F.viewReadOnly(
+ ring.allNodes(),
+ new C1<TcpDiscoveryNode, Collection<InetSocketAddress>>() {
+ @Override public Collection<InetSocketAddress> apply(TcpDiscoveryNode node) {
+ return !node.isClient() ? adapter.getNodeAddresses(node) :
+ Collections.<InetSocketAddress>emptyList();
+ }
+ }
+ )
+ );
+
+ // Addresses registered in IP finder.
+ Collection<InetSocketAddress> regAddrs = adapter.registeredAddresses();
+
+ // Remove all addresses that belong to alive nodes, leave dead-node addresses.
+ Collection<InetSocketAddress> rmvAddrs = F.view(
+ regAddrs,
+ F.notContains(currAddrs),
+ new P1<InetSocketAddress>() {
+ private final Map<InetSocketAddress, Boolean> pingResMap =
+ new HashMap<>();
+
+ @Override public boolean apply(InetSocketAddress addr) {
+ Boolean res = pingResMap.get(addr);
+
+ if (res == null) {
+ try {
+ res = pingNode(addr, null).get1() != null;
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to ping node [addr=" + addr +
+ ", err=" + e.getMessage() + ']');
+
+ res = false;
+ }
+ finally {
+ pingResMap.put(addr, res);
+ }
+ }
+
+ return !res;
+ }
+ }
+ );
+
+ // Unregister dead-nodes addresses.
+ if (!rmvAddrs.isEmpty()) {
+ adapter.ipFinder.unregisterAddresses(rmvAddrs);
+
+ if (log.isDebugEnabled())
+ log.debug("Unregistered addresses from IP finder: " + rmvAddrs);
+ }
+
+ // Addresses that were removed by mistake (e.g. on segmentation).
+ Collection<InetSocketAddress> missingAddrs = F.view(
+ currAddrs,
+ F.notContains(regAddrs)
+ );
+
+ // Re-register missing addresses.
+ if (!missingAddrs.isEmpty()) {
+ adapter.ipFinder.registerAddresses(missingAddrs);
+
+ if (log.isDebugEnabled())
+ log.debug("Registered missing addresses in IP finder: " + missingAddrs);
+ }
+ }
+ catch (IgniteSpiException e) {
+ LT.error(log, e, "Failed to clean IP finder up.");
+ }
+ }
+ }
+
+ /**
+ * Pending messages container.
+ */
+ private static class PendingMessages {
+ /** */
+ private static final int MAX = 1024;
+
+ /** Pending messages. */
+ private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
+
+ /** Discarded message ID. */
+ private IgniteUuid discardId;
+
+ /**
+ * Adds pending message and shrinks queue if it exceeds limit
+ * (messages that were not discarded yet are never removed).
+ *
+ * @param msg Message to add.
+ */
+ void add(TcpDiscoveryAbstractMessage msg) {
+ msgs.add(msg);
+
+ while (msgs.size() > MAX) {
+ TcpDiscoveryAbstractMessage polled = msgs.poll();
+
+ assert polled != null;
+
+ if (polled.id().equals(discardId))
+ break;
+ }
+ }
+
+ /**
+ * Gets messages starting from provided ID (exclusive). If such
+ * message is not found, {@code null} is returned (this indicates
+ * a failure condition when it was already removed from queue).
+ *
+ * @param lastMsgId Last message ID.
+ * @return Collection of messages.
+ */
+ @Nullable Collection<TcpDiscoveryAbstractMessage> messages(IgniteUuid lastMsgId) {
+ assert lastMsgId != null;
+
+ Collection<TcpDiscoveryAbstractMessage> copy = new ArrayList<>(msgs.size());
+
+ boolean skip = true;
+
+ for (TcpDiscoveryAbstractMessage msg : msgs) {
+ if (skip) {
+ if (msg.id().equals(lastMsgId))
+ skip = false;
+ }
+ else
+ copy.add(msg);
+ }
+
+ return !skip ? copy : null;
+ }
+
+ /**
+ * Resets pending messages.
+ *
+ * @param msgs Message.
+ * @param discardId Discarded message ID.
+ */
+ void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
+ this.msgs.clear();
+
+ if (msgs != null)
+ this.msgs.addAll(msgs);
+
+ this.discardId = discardId;
+ }
+
+ /**
+ * Clears pending messages.
+ */
+ void clear() {
+ msgs.clear();
+
+ discardId = null;
+ }
+
+ /**
+ * Discards message with provided ID and all before it.
+ *
+ * @param id Discarded message ID.
+ */
+ void discard(IgniteUuid id) {
+ discardId = id;
+ }
+ }
+
+ /**
+ * Message worker thread for messages processing.
+ */
+ private class RingMessageWorker extends MessageWorkerAdapter {
+ /** Next node. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ private TcpDiscoveryNode next;
+
+ /** Pending messages. */
+ private final PendingMessages pendingMsgs = new PendingMessages();
+
+ /** Last message that updated topology. */
+ private TcpDiscoveryAbstractMessage lastMsg;
+
+ /** Force pending messages send. */
+ private boolean forceSndPending;
+
+ /** Socket. */
+ private Socket sock;
+
+ /**
+ */
+ protected RingMessageWorker() {
+ super("tcp-disco-msg-worker");
+ }
+
+ /**
+ * @param msg Message to process.
+ */
+ @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+ if (log.isDebugEnabled())
+ log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+
+ if (debugMode)
+ debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+
+ adapter.stats.onMessageProcessingStarted(msg);
+
+ if (msg instanceof TcpDiscoveryJoinRequestMessage)
+ processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryClientReconnectMessage)
+ processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryNodeAddedMessage)
+ processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+ processNodeAddFinishedMessage((TcpDiscoveryNodeAddFinishedMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryNodeLeftMessage)
+ processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryNodeFailedMessage)
+ processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
+ processClientHeartbeatMessage((TcpDiscoveryClientHeartbeatMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryHeartbeatMessage)
+ processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryStatusCheckMessage)
+ processStatusCheckMessage((TcpDiscoveryStatusCheckMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryDiscardMessage)
+ processDiscardMessage((TcpDiscoveryDiscardMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryCustomEventMessage)
+ processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+
+ else if (msg instanceof TcpDiscoveryClientPingRequest)
+ processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
+
+ else
+ assert false : "Unknown message type: " + msg.getClass().getSimpleName();
+
+ adapter.stats.onMessageProcessingFinished(msg);
+ }
+
+ /**
+ * Sends message across the ring.
+ *
+ * @param msg Message to send
+ */
+ @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"})
+ private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
+ assert msg != null;
+
+ assert ring.hasRemoteNodes();
+
+ for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : adapter.sendMsgLsnrs)
+ msgLsnr.apply(msg);
+
+ if (redirectToClients(msg)) {
+ byte[] marshalledMsg = null;
+
+ for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
+ // Send a clone to client to avoid ConcurrentModificationException
+ TcpDiscoveryAbstractMessage msgClone;
+
+ try {
+ if (marshalledMsg == null)
+ marshalledMsg = adapter.marsh.marshal(msg);
+
+ msgClone = adapter.marsh.unmarshal(marshalledMsg, null);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal message: " + msg, e);
+
+ msgClone = msg;
+ }
+
+ clientMsgWorker.addMessage(msgClone);
+ }
+ }
+
+ Collection<TcpDiscoveryNode> failedNodes;
+
+ TcpDiscoverySpiState state;
+
+ synchronized (mux) {
+ failedNodes = U.arrayList(ServerImpl.this.failedNodes);
+
+ state = spiState;
+ }
+
+ Collection<Throwable> errs = null;
+
+ boolean sent = false;
+
+ boolean searchNext = true;
+
+ UUID locNodeId = getLocalNodeId();
+
+ while (true) {
+ if (searchNext) {
+ TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
+
+ if (newNext == null) {
+ if (log.isDebugEnabled())
+ log.debug("No next node in topology.");
+
+ if (debugMode)
+ debugLog("No next node in topology.");
+
+ if (ring.hasRemoteNodes()) {
+ msg.senderNodeId(locNodeId);
+
+ addMessage(msg);
+ }
+
+ break;
+ }
+
+ if (!newNext.equals(next)) {
+ if (log.isDebugEnabled())
+ log.debug("New next node [newNext=" + newNext + ", formerNext=" + next +
+ ", ring=" + ring + ", failedNodes=" + failedNodes + ']');
+
+ if (debugMode)
+ debugLog("New next node [newNext=" + newNext + ", formerNext=" + next +
+ ", ring=" + ring + ", failedNodes=" + failedNodes + ']');
+
+ U.closeQuiet(sock);
+
+ sock = null;
+
+ next = newNext;
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Next node remains the same [nextId=" + next.id() +
+ ", nextOrder=" + next.internalOrder() + ']');
+ }
+
+ // Flag that shows whether next node exists and accepts incoming connections.
+ boolean nextNodeExists = sock != null;
+
+ final boolean sameHost = U.sameMacs(locNode, next);
+
+ List<InetSocketAddress> localNodeAddresses = U.arrayList(locNode.socketAddresses());
+
+ addr: for (InetSocketAddress addr : adapter.getNodeAddresses(next, sameHost)) {
+ long ackTimeout0 = adapter.ackTimeout;
+
+ if (localNodeAddresses.contains(addr)){
+ if (log.isDebugEnabled())
+ log.debug("Skip to send message to the local node (probably remote node has the same " +
+ "loopback address that local node): " + addr);
+
+ continue;
+ }
+
+ for (int i = 0; i < adapter.reconCnt; i++) {
+ if (sock == null) {
+ nextNodeExists = false;
+
+ boolean success = false;
+
+ boolean openSock = false;
+
+ // Restore ring.
+ try {
+ long tstamp = U.currentTimeMillis();
+
+ sock = adapter.openSocket(addr);
+
+ openSock = true;
+
+ // Handshake.
+ writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+
+ TcpDiscoveryHandshakeResponse res = adapter.readMessage(sock, null, ackTimeout0);
+
+ if (locNodeId.equals(res.creatorNodeId())) {
+ if (log.isDebugEnabled())
+ log.debug("Handshake response from local node: " + res);
+
+ U.closeQuiet(sock);
+
+ sock = null;
+
+ break;
+ }
+
+ adapter.stats.onClientSocketInitialized(U.currentTimeMillis() - tstamp);
+
+ UUID nextId = res.creatorNodeId();
+
+ long nextOrder = res.order();
+
+ if (!next.id().equals(nextId)) {
+ // Node with different ID has bounded to the same port.
+ if (log.isDebugEnabled())
+ log.debug("Failed to restore ring because next node ID received is not as " +
+ "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
+
+ if (debugMode)
+ debugLog("Failed to restore ring because next node ID received is not as " +
+ "expected [expectedId=" + next.id() + ", rcvdId=" + nextId + ']');
+
+ break;
+ }
+ else {
+ // ID is as expected. Check node order.
+ if (nextOrder != next.internalOrder()) {
+ // Is next currently being added?
+ boolean nextNew = (msg instanceof TcpDiscoveryNodeAddedMessage &&
+ ((TcpDiscoveryNodeAddedMessage)msg).node().id().equals(nextId));
+
+ if (!nextNew) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to restore ring because next node order received " +
+ "is not as expected [expected=" + next.internalOrder() +
+ ", rcvd=" + nextOrder + ", id=" + next.id() + ']');
+
+ if (debugMode)
+ debugLog("Failed to restore ring because next node order received " +
+ "is not as expected [expected=" + next.internalOrder() +
+ ", rcvd=" + nextOrder + ", id=" + next.id() + ']');
+
+ break;
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Initialized connection with next node: " + next.id());
+
+ if (debugMode)
+ debugLog("Initialized connection with next node: " + next.id());
+
+ errs = null;
+
+ success = true;
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (errs == null)
+ errs = new ArrayList<>();
+
+ errs.add(e);
+
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to connect to next node [msg=" + msg
+ + ", err=" + e.getMessage() + ']', e);
+
+ onException("Failed to connect to next node [msg=" + msg + ", err=" + e + ']', e);
+
+ if (!openSock)
+ break; // Don't retry if we can not establish connection.
+
+ if (e instanceof SocketTimeoutException ||
+ X.hasCause(e, SocketTimeoutException.class)) {
+ ackTimeout0 *= 2;
+
+ if (!checkAckTimeout(ackTimeout0))
+ break;
+ }
+
+ continue;
+ }
+ finally {
+ if (!success) {
+ U.closeQuiet(sock);
+
+ sock = null;
+ }
+ else
+ // Next node exists and accepts incoming messages.
+ nextNodeExists = true;
+ }
+ }
+
+ try {
+ boolean failure;
+
+ synchronized (mux) {
+ failure = ServerImpl.this.failedNodes.size() < failedNodes.size();
+ }
+
+ assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;
+
+ if (failure || forceSndPending) {
+ if (log.isDebugEnabled())
+ log.debug("Pending messages will be sent [failure=" + failure +
+ ", forceSndPending=" + forceSndPending + ']');
+
+ if (debugMode)
+ debugLog("Pending messages will be sent [failure=" + failure +
+ ", forceSndPending=" + forceSndPending + ']');
+
+ boolean skip = pendingMsgs.discardId != null;
+
+ for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
+ if (skip) {
+ if (pendingMsg.id().equals(pendingMsgs.discardId))
+ skip = false;
+
+ continue;
+ }
+
+ long tstamp = U.currentTimeMillis();
+
+ prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
+ pendingMsgs.discardId);
+
+ try {
+ writeToSocket(sock, pendingMsg);
+ }
+ finally {
+ clearNodeAddedMessage(pendingMsg);
+ }
+
+ adapter.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
+
+ int res = adapter.readReceipt(sock, ackTimeout0);
+
+ if (log.isDebugEnabled())
+ log.debug("Pending message has been sent to next node [msg=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+ ", res=" + res + ']');
+
+ if (debugMode)
+ debugLog("Pending message has been sent to next node [msg=" + msg.id() +
+ ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+ ", res=" + res + ']');
+ }
+ }
+
+ prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+
+ try {
+ long tstamp = U.currentTimeMillis();
+
+ writeToSocket(sock, msg);
+
+ adapter.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
+
+ int res = adapter.readReceipt(sock, ackTimeout0);
+
+ if (log.isDebugEnabled())
+ log.debug("Message has been sent to next node [msg=" + msg +
+ ", next=" + next.id() +
+ ", res=" + res + ']');
+
+ if (debugMode)
+ debugLog("Message has been sent to next node [msg=" + msg +
+ ", next=" + next.id() +
+ ", res=" + res + ']');
+ }
+ finally {
+ clearNodeAddedMessage(msg);
+ }
+
+ registerPendingMessage(msg);
+
+ sent = true;
+
+ break addr;
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (errs == null)
+ errs = new ArrayList<>();
+
+ errs.add(e);
+
+ if (log.isDebugEnabled())
+ U.error(log, "Failed to send message to next node [next=" + next.id() + ", msg=" + msg +
+ ", err=" + e + ']', e);
+
+ onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
+ e);
+
+ if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+ ackTimeout0 *= 2;
+
+ if (!checkAckTimeout(ackTimeout0))
+ break;
+ }
+ }
+ finally {
+ forceSndPending = false;
+
+ if (!sent) {
+ U.closeQuiet(sock);
+
+ sock = null;
+
+ if (log.isDebugEnabled())
+ log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg +
+ ", i=" + i + ']');
+ }
+ }
+ } // Try to reconnect.
+ } // Iterating node's addresses.
+
+ if (!sent) {
+ if (!failedNodes.contains(next)) {
+ failedNodes.add(next);
+
+ if (state == CONNECTED) {
+ Exception err = errs != null ?
+ U.exceptionWithSuppressed("Failed to send message to next node [msg=" + msg +
+ ", next=" + U.toShortString(next) + ']', errs) :
+ null;
+
+ // If node existed on connection initialization we should check
+ // whether it has not gone yet.
+ if (nextNodeExists && pingNode(next))
+ U.error(log, "Failed to send message to next node [msg=" + msg +
+ ", next=" + next + ']', err);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to send message to next node [msg=" + msg + ", next=" + next +
+ ", errMsg=" + (err != null ? err.getMessage() : "N/A") + ']');
+ }
+ }
+
+ if (msg instanceof TcpDiscoveryStatusCheckMessage) {
+ TcpDiscoveryStatusCheckMessage msg0 = (TcpDiscoveryStatusCheckMessage)msg;
+
+ if (next.id().equals(msg0.failedNodeId())) {
+ next = null;
+
+ if (log.isDebugEnabled())
+ log.debug("Discarding status check since next node has indeed failed [next=" + next +
+ ", msg=" + msg + ']');
+
+ // Discard status check message by exiting loop and handle failure.
+ break;
+ }
+ }
+
+ next = null;
+
+ searchNext = true;
+
+ errs = null;
+ }
+ else
+ break;
+ }
+
+ synchronized (mux) {
+ failedNodes.removeAll(ServerImpl.this.failedNodes);
+ }
+
+ if (!failedNodes.isEmpty()) {
+ if (state == CONNECTED) {
+ if (!sent && log.isDebugEnabled())
+ // Message has not been sent due to some problems.
+ log.debug("Message has not been sent: " + m
<TRUNCATED>