You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:52 UTC
[29/50] [abbrv] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-3054
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f388e3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f6d19e0,50fa3bd..e743b5c
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@@ -158,14 -128,12 +158,15 @@@ import org.apache.ignite.spi.discovery.
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
+import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
- import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
+ import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@@ -205,25 -173,9 +206,24 @@@ class ServerImpl extends TcpDiscoveryIm
private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE =
IgniteProductVersion.fromString("1.5.0");
+ /** Node ID in GridNioSession. */
+ private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** ClientNioMessageWorker in GridNioSession. */
+ private static final int NIO_WORKER_META = GridNioSessionMetaKey.nextUniqueKey();
+
+ /**
+ * Number of tries to reopen ServerSocketChannel on 'SocketException: Invalid argument'.
+ * <p>This error may happen on simultaneous server nodes startup on the same JVM.</p>
+ */
+ private static final int REOPEN_SERVER_SOCKET_CHANNEL_TRIES = 3;
+
/** */
- private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
+ private IgniteThreadPoolExecutor utilityPool;
+ /** */
+ private IgniteThreadPoolExecutor nioClientProcessingPool;
+
/** Nodes ring. */
@GridToStringExclude
private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing();
@@@ -5356,16 -5444,19 +5677,20 @@@
int lastPort = spi.locPortRange == 0 ? spi.locPort : spi.locPort + spi.locPortRange - 1;
+ openServerSocketChannel();
+
+ int reopenTries = 0;
+
for (port = spi.locPort; port <= lastPort; port++) {
try {
- if (spi.isSslEnabled())
- srvrSock = spi.sslSrvSockFactory.createServerSocket(port, 0, spi.locHost);
- else
- srvrSock = new ServerSocket(port, 0, spi.locHost);
+ srvCh.bind(new InetSocketAddress(spi.locHost, port));
- if (log.isInfoEnabled())
- log.info("Successfully bound to TCP port [port=" + port + ", localHost=" + spi.locHost + ']');
+ if (log.isInfoEnabled()) {
+ log.info("Successfully bound to TCP port [port=" + port +
+ ", localHost=" + spi.locHost +
+ ", locNodeId=" + spi.ignite().configuration().getNodeId() +
+ ']');
+ }
return;
}
@@@ -5508,1877 -5525,606 +5833,1883 @@@
}
/**
- * Thread that reads messages from the socket created for incoming connections.
+ * Encapsulates ping logic for client processors.
*/
- private class SocketReader extends IgniteSpiThread {
- /** Socket to read data from. */
- private final Socket sock;
+ private class ClientMessagePinger {
+ /** */
+ final AtomicReference<GridFutureAdapter<Boolean>> pingFut = new AtomicReference<>();
/** */
- private volatile UUID nodeId;
+ private final ClientMessageProcessor proc;
/**
- * Constructor.
- *
- * @param sock Socket to read data from.
+ * @param proc Processor.
*/
- SocketReader(Socket sock) {
- super(spi.ignite().name(), "tcp-disco-sock-reader", log);
+ private ClientMessagePinger(final ClientMessageProcessor proc) {
+ this.proc = proc;
+ }
- this.sock = sock;
+ /**
+ * @param timeoutHelper Time out helper.
+ * @return {@code True} if ping sent.
+ * @throws InterruptedException If fail.
+ */
+ public boolean ping(final IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException {
+ if (spi.isNodeStopping0())
+ return false;
- setPriority(spi.threadPri);
+ GridFutureAdapter<Boolean> fut;
- spi.stats.onSocketReaderCreated();
- }
+ while (true) {
+ fut = pingFut.get();
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- UUID locNodeId = getConfiguredNodeId();
+ if (fut != null)
+ break;
- ClientMessageWorker clientMsgWrk = null;
+ fut = new GridFutureAdapter<>();
- try {
- InputStream in;
+ if (pingFut.compareAndSet(null, fut)) {
+ TcpDiscoveryPingRequest pingReq = new TcpDiscoveryPingRequest(getLocalNodeId(), proc.clientNodeId());
- try {
- // Set socket options.
- sock.setKeepAlive(true);
- sock.setTcpNoDelay(true);
+ pingReq.verify(getLocalNodeId());
- int timeout = sock.getSoTimeout();
+ proc.addMessage(pingReq);
- sock.setSoTimeout((int)spi.netTimeout);
+ break;
+ }
+ }
- for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
- connLsnr.apply(sock);
+ try {
+ return fut.get(timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()),
+ TimeUnit.MILLISECONDS);
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ throw new InterruptedException();
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ if (pingFut.compareAndSet(fut, null))
+ fut.onDone(false);
- int rcvBufSize = sock.getReceiveBufferSize();
+ return false;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException("Internal error: ping future cannot be done with exception", e);
+ }
+ }
- in = new BufferedInputStream(sock.getInputStream(), rcvBufSize > 0 ? rcvBufSize : 8192);
+ /**
+ * @param res Ping result.
+ */
+ void pingResult(final boolean res) {
+ final GridFutureAdapter<Boolean> fut = pingFut.getAndSet(null);
- byte[] buf = new byte[4];
- int read = 0;
+ if (fut != null)
+ fut.onDone(res);
+ }
+ }
- while (read < buf.length) {
- int r = in.read(buf, read, buf.length - read);
+ /**
+ * NIO worker state.
+ */
+ private enum WorkerState {
+ /** Initial worker state. */
+ NOT_STARTED,
- if (r >= 0)
- read += r;
- else {
- if (log.isDebugEnabled())
- log.debug("Failed to read magic header (too few bytes received) " +
- "[rmtAddr=" + sock.getRemoteSocketAddress() +
- ", locAddr=" + sock.getLocalSocketAddress() + ']');
+ /** Started but all messages are added to queue. */
+ STARTED,
- LT.warn(log, "Failed to read magic header (too few bytes received) [rmtAddr=" +
- sock.getRemoteSocketAddress() + ", locAddr=" + sock.getLocalSocketAddress() + ']');
+ /** Started and all messages are sent to client node. */
+ JOINED,
- return;
- }
- }
+ /** Worker was stopped. */
+ STOPPED
+ }
- if (!Arrays.equals(buf, U.IGNITE_HEADER)) {
- if (log.isDebugEnabled())
- log.debug("Unknown connection detected (is some other software connecting to " +
- "this Ignite port?" +
- (!spi.isSslEnabled() ? " missed SSL configuration?" : "" ) +
- ") " +
- "[rmtAddr=" + sock.getRemoteSocketAddress() +
- ", locAddr=" + sock.getLocalSocketAddress() + ']');
+ /**
+ * Non blocking client message processor.
+ */
+ private class ClientNioMessageWorker implements ClientMessageProcessor {
+ /** ID of the node served by this processor. */
+ private final UUID clientNodeId;
- LT.warn(log, "Unknown connection detected (is some other software connecting to " +
- "this Ignite port?" +
- (!spi.isSslEnabled() ? " missing SSL configuration on remote node?" : "" ) +
- ") [rmtAddr=" + sock.getInetAddress() + ']', true);
+ /** Socket connected to the client. */
+ private final Socket sock;
- return;
- }
+ /** Current session. */
+ private volatile GridNioSession ses;
- // Restore timeout.
- sock.setSoTimeout(timeout);
+ /** */
+ private volatile ClusterMetrics metrics;
- TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
+ /** */
+ private volatile ClientMessagePinger pinger;
- // Ping.
- if (msg instanceof TcpDiscoveryPingRequest) {
- if (!spi.isNodeStopping0()) {
- TcpDiscoveryPingRequest req = (TcpDiscoveryPingRequest)msg;
+ /** Client message queue. */
+ private final Deque<T2<TcpDiscoveryAbstractMessage, byte[]>> msgQueue;
- TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
+ /** Worker state. */
+ private volatile WorkerState state = WorkerState.NOT_STARTED;
- IgniteSpiOperationTimeoutHelper timeoutHelper =
- new IgniteSpiOperationTimeoutHelper(spi);
+ /** Indicates whether messages are sent to client. Only one thread can send messages to single client to keep their order. */
+ private final AtomicBoolean sending = new AtomicBoolean(false);
- if (req.clientNodeId() != null) {
- ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
+ /**
+ * @param clientNodeId Client node ID.
+ * @param sock Socket.
+ */
+ ClientNioMessageWorker(final UUID clientNodeId, final Socket sock) {
+ this.clientNodeId = clientNodeId;
+ this.sock = sock;
- if (clientWorker != null)
- res.clientExists(clientWorker.ping(timeoutHelper));
- }
+ msgQueue = new ConcurrentLinkedDeque8<>();
+ }
- spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
- }
- else if (log.isDebugEnabled())
- log.debug("Ignore ping request, node is stopping.");
+ /**
+ * Open session and start listen for client messages.
+ *
+ * @throws IgniteCheckedException If fail.
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void start() throws IgniteCheckedException, SSLException {
+ if (state != WorkerState.NOT_STARTED)
+ return;
- return;
- }
+ final Map<Integer, Object> meta = new HashMap<>();
- // Handshake.
- TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
+ meta.put(NODE_ID_META, clientNodeId());
- UUID nodeId = req.creatorNodeId();
+ final SocketChannel ch = sock.getChannel();
- this.nodeId = nodeId;
+ if (spi.isSslEnabled()) {
+ assert sock instanceof NioSslSocket;
- TcpDiscoveryHandshakeResponse res =
- new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
+ // Put the engine to the meta map to allow nio server to use it:
+ meta.put(GridNioSessionMetaKey.SSL_META.ordinal(), ((NioSslSocket)sock).sslEngine);
+ }
- if (req.client())
- res.clientAck(true);
+ ses = clientNioSrv.createSession(ch, meta).get();
- spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ?
- spi.failureDetectionTimeout() : spi.getSocketTimeout());
+ ses.addMeta(NIO_WORKER_META, this);
- // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
- // the local node sends a handshake request message on the loopback address, so we get here.
- if (locNodeId.equals(nodeId)) {
- assert !req.client();
+ state = WorkerState.STARTED;
+ }
- if (log.isDebugEnabled())
- log.debug("Handshake request from local node: " + req);
+ /**
+ * Close connection to the client.
+ *
+ * @throws IgniteCheckedException If fail.
+ */
+ public void stop() throws IgniteCheckedException {
+ nonblockingStop().get();
+ }
- return;
+ /**
+ * Close connection to the client and exit immediately - do not wait
+ * when operation completes.
+ *
+ * @return Operation future.
+ */
+ synchronized IgniteInternalFuture nonblockingStop() {
+ if (state == WorkerState.NOT_STARTED)
+ state = WorkerState.STOPPED;
+
+ if (state == WorkerState.STOPPED)
+ return new GridFinishedFuture<>();
+
+ final IgniteInternalFuture<Object> res = ses.close().chain(new C1<IgniteInternalFuture<Boolean>, Object>() {
+ @Override public Object apply(final IgniteInternalFuture<Boolean> fut) {
+ try {
+ return fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSpiException(e.getMessage(), e);
}
+ finally {
+ U.closeQuiet(sock);
+ }
+ }
+ });
- if (req.client()) {
- ClientMessageWorker clientMsgWrk0 = new ClientMessageWorker(sock, nodeId);
+ state = WorkerState.STOPPED;
- while (true) {
- ClientMessageWorker old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
+ nioWorkers.remove(this);
- if (old == null)
- break;
+ ses.removeMeta(NIO_WORKER_META);
- if (old.isInterrupted()) {
- clientMsgWorkers.remove(nodeId, old);
+ return res;
+ }
- continue;
- }
+ /**
+ * Change state to JOINED and send all pending messages.
+ */
+ void markJoinedAndSendPendingMessages() {
+ if (state == WorkerState.STARTED) {
+ state = WorkerState.JOINED;
- old.join(500);
+ nioWorkers.add(this);
- old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
+ nioSem.release();
+ }
+ }
- if (old == null)
- break;
+ /**
+ * Add receipt to message queue.
+ *
+ * @param receipt Receipt.
+ * @return Send future.
+ */
+ GridNioFuture<?> addReceipt(final int receipt) {
+ try {
+ return spi.sendMessage(ses, null, new byte[]{(byte) receipt});
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed marshal message, closing connection. [receipt=" + receipt + ", ses=" + ses + ']', e);
- if (log.isDebugEnabled())
- log.debug("Already have client message worker, closing connection " +
- "[locNodeId=" + locNodeId +
- ", rmtNodeId=" + nodeId +
- ", workerSock=" + old.sock +
- ", sock=" + sock + ']');
+ nonblockingStop();
- return;
- }
+ clientMsgWorkers.remove(clientNodeId, this);
- if (log.isDebugEnabled())
- log.debug("Created client message worker [locNodeId=" + locNodeId +
- ", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
+ return new GridNioFinishedFuture<>(e);
+ }
+ }
- assert clientMsgWrk0 == clientMsgWorkers.get(nodeId);
+ /**
+ * Add receipt and call closure when it will be sent.
+ *
+ * @param receipt Receipt.
+ * @param clos Closure.
+ * @return Future for chaining.
+ */
+ IgniteInternalFuture<?> addReceipt(final int receipt,
+ final IgniteClosure<? super IgniteInternalFuture<?>, ?> clos) {
+ return addReceipt(receipt).chain(clos);
+ }
- clientMsgWrk = clientMsgWrk0;
- }
+ /** {@inheritDoc} */
+ @Override public void addMessage(final TcpDiscoveryAbstractMessage msg) {
+ addMessage(msg, null);
+ }
- if (log.isDebugEnabled())
- log.debug("Initialized connection with remote node [nodeId=" + nodeId +
- ", client=" + req.client() + ']');
+ /** {@inheritDoc} */
+ @Override public void addMessage(final TcpDiscoveryAbstractMessage msg, @Nullable final byte[] msgBytes) {
+ // add message to queue if client is not joined yet
+ final WorkerState state0 = this.state;
- if (debugMode) {
- debugLog(msg, "Initialized connection with remote node [nodeId=" + nodeId +
- ", client=" + req.client() + ']');
- }
- }
- catch (IOException e) {
- if (log.isDebugEnabled())
- U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
+ if (state0 == WorkerState.STOPPED)
+ return;
- if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0())
- LT.warn(log, "Failed to initialize connection " +
- "(missing SSL configuration on remote node?) " +
- "[rmtAddr=" + sock.getInetAddress() + ']', true);
- else if ((X.hasCause(e, ObjectStreamException.class) || !sock.isClosed())
- && !spi.isNodeStopping0()) {
- if (U.isMacInvalidArgumentError(e))
- LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" +
- U.MAC_INVALID_ARG_MSG);
- else {
- U.error(
- log,
- "Failed to initialize connection (this can happen due to short time " +
- "network problems and can be ignored if does not affect node discovery) " +
- "[sock=" + sock + ']',
- e);
- }
- }
+ if (msg.highPriority())
+ msgQueue.addFirst(new T2<>(msg, msgBytes));
+ else
+ msgQueue.add(new T2<>(msg, msgBytes));
- onException("Caught exception on handshake [err=" + e + ", sock=" + sock + ']', e);
+ nioSem.release();
+ }
- return;
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
+ /**
+ * Send message despite of worker state.
+ *
+ * @param msg Message to send.
+ * @param msgBytes Message bytes to send.
+ */
+ public void sendMessage(final TcpDiscoveryAbstractMessage msg, @Nullable final byte[] msgBytes) {
+ try {
+ spi.sendMessage(ses, msg, msgBytes);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed marshal message, closing connection. [msg=" + msg + ", ses=" + ses + ']', e);
- onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
+ nonblockingStop();
- if (e.hasCause(SocketTimeoutException.class))
- LT.warn(log, "Socket operation timed out on handshake " +
- "(consider increasing 'networkTimeout' configuration property) " +
- "[netTimeout=" + spi.netTimeout + ']');
+ clientMsgWorkers.remove(clientNodeId, this);
+ }
+ }
- else if (e.hasCause(ClassNotFoundException.class))
- LT.warn(log, "Failed to read message due to ClassNotFoundException " +
- "(make sure same versions of all classes are available on all nodes) " +
- "[rmtAddr=" + sock.getRemoteSocketAddress() +
- ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
+ /** {@inheritDoc} */
+ @Override public boolean ping(final IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException {
+ return pinger().ping(timeoutHelper);
+ }
- // Always report marshalling problems.
- else if (e.hasCause(ObjectStreamException.class) ||
- (!sock.isClosed() && !e.hasCause(IOException.class)))
- LT.error(log, e, "Failed to initialize connection [sock=" + sock + ']');
+ /** {@inheritDoc} */
+ @Override public void pingResult(final boolean res) {
+ pinger().pingResult(res);
+ }
- return;
- }
+ /** {@inheritDoc} */
+ @Override public UUID clientNodeId() {
+ return clientNodeId;
+ }
- long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
- spi.getSocketTimeout();
+ /** {@inheritDoc} */
+ @Override public ClusterMetrics metrics() {
+ return metrics;
+ }
- while (!isInterrupted()) {
- try {
- TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
- U.resolveClassLoader(spi.ignite().configuration()));
+ /** {@inheritDoc} */
+ @Override public void metrics(final ClusterMetrics metrics) {
+ this.metrics = metrics;
+ }
- msg.senderNodeId(nodeId);
+ /**
+ * @return Pinger.
+ */
+ private ClientMessagePinger pinger() {
+ if (pinger == null) {
+ synchronized (this) {
+ if (pinger == null)
+ pinger = new ClientMessagePinger(this);
+ }
+ }
- DebugLogger debugLog = messageLogger(msg);
+ return pinger;
+ }
- if (debugLog.isDebugEnabled())
- debugLog.debug("Message has been received: " + msg);
+ /**
+ * @return Current worker state.
+ */
+ public WorkerState state() {
+ return state;
+ }
- spi.stats.onMessageReceived(msg);
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ClientNioMessageWorker.class, this);
+ }
+ }
- if (debugMode && recordable(msg))
- debugLog(msg, "Message has been received: " + msg);
- if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
- spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
- continue;
- }
- else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
- TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
+ /**
+ * Processes incoming nio client messages.
+ */
+ private class ClientNioMessageProcessor {
+ /** */
+ private final IgniteLogger log;
- if (!req.responded()) {
- boolean ok = processJoinRequestMessage(req, clientMsgWrk);
+ /**
+ * @param log Logger.
+ */
+ private ClientNioMessageProcessor(IgniteLogger log) {
+ this.log = log;
+ }
- if (clientMsgWrk != null && ok)
- continue;
- else
- // Direct join request - no need to handle this socket anymore.
- break;
- }
- }
- else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
- if (clientMsgWrk != null) {
- TcpDiscoverySpiState state = spiStateCopy();
+ /**
+ * Process client message.
+ *
+ * @param ses Nio session.
+ * @param msg0 Incoming message.
+ */
+ void processMessage(GridNioSession ses, byte[] msg0) throws IgniteCheckedException {
+ final TcpDiscoveryAbstractMessage msg = spi.marshaller().unmarshal(msg0,
+ U.resolveClassLoader(spi.ignite().configuration()));
- if (state == CONNECTED) {
- spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+ final UUID nodeId = getConfiguredNodeId();
+ final UUID clientNodeId = clientNodeId(ses);
- if (clientMsgWrk.getState() == State.NEW)
- clientMsgWrk.start();
+ final ClientNioMessageWorker clientMsgWrk = ses.meta(NIO_WORKER_META);
- msgWorker.addMessage(msg);
+ if (clientMsgWrk == null) {
+ if (log.isDebugEnabled())
+ log.debug("NIO Worker has been closed, drop message. [clientNodeId="
+ + clientNodeId + ", message=" + msg + "]");
- continue;
- }
- else {
- spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
+ if (ses.closeTime() == 0)
+ ses.close();
- break;
- }
- }
- }
- else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
- // Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+ return;
+ }
- boolean ignored = false;
+ msg.senderNodeId(nodeId);
- TcpDiscoverySpiState state = null;
+ if (log.isDebugEnabled())
+ log.debug("Message has been received: " + msg);
- synchronized (mux) {
- if (spiState == CONNECTING) {
- joinRes.set(msg);
+ spi.stats.onMessageReceived(msg);
- spiState = DUPLICATE_ID;
+ if (debugMode && recordable(msg))
+ debugLog(msg, "Message has been received: " + msg);
- mux.notifyAll();
- }
- else {
- ignored = true;
+ if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+ clientMsgWrk.addReceipt(RES_OK);
- state = spiState;
- }
- }
+ return;
+ }
+ else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+ final TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
- if (ignored && log.isDebugEnabled())
- log.debug("Duplicate ID message has been ignored [msg=" + msg +
- ", spiState=" + state + ']');
+ if (!req.responded()) {
+ final TcpDiscoverySpiState state = spiStateCopy();
- continue;
- }
- else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
- // Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+ if (state == CONNECTED) {
+ clientMsgWrk.addReceipt(RES_OK, new CX1<IgniteInternalFuture<?>, Object>() {
+ private static final long serialVersionUID = 0L;
- boolean ignored = false;
+ @Override public Object applyx(
+ final IgniteInternalFuture<?> fut) throws IgniteCheckedException {
+ req.responded(true);
- TcpDiscoverySpiState state = null;
+ msgWorker.addMessage(req);
- synchronized (mux) {
- if (spiState == CONNECTING) {
- joinRes.set(msg);
+ clientMsgWrk.markJoinedAndSendPendingMessages();
- spiState = AUTH_FAILED;
+ return null;
+ }
+ });
- mux.notifyAll();
- }
- else {
- ignored = true;
+ return;
+ }
+ else {
+ spi.stats.onMessageProcessingStarted(req);
- state = spiState;
- }
- }
+ final Integer res;
- if (ignored && log.isDebugEnabled())
- log.debug("Auth failed message has been ignored [msg=" + msg +
- ", spiState=" + state + ']');
+ final SocketAddress rmtAddr = clientMsgWrk.sock.getRemoteSocketAddress();
- continue;
+ if (state == CONNECTING) {
+ if (noResAddrs.contains(rmtAddr) ||
+ getLocalNodeId().compareTo(req.creatorNodeId()) < 0)
+ // Remote node node has not responded to join request or loses UUID race.
+ res = RES_WAIT;
+ else
+ // Remote node responded to join request and wins UUID race.
+ res = RES_CONTINUE_JOIN;
}
- else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
- // Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+ else
+ // Local node is stopping. Remote node should try next one.
+ res = RES_CONTINUE_JOIN;
- boolean ignored = false;
+ clientMsgWrk.addReceipt(res, new CX1<IgniteInternalFuture<?>, Object>() {
+ private static final long serialVersionUID = 0L;
- TcpDiscoverySpiState state = null;
+ @Override public Object applyx(
+ final IgniteInternalFuture<?> fut) throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Responded to join request message [msg=" + req + ", res=" + res + ']');
- synchronized (mux) {
- if (spiState == CONNECTING) {
- joinRes.set(msg);
+ fromAddrs.addAll(req.node().socketAddresses());
- spiState = CHECK_FAILED;
+ spi.stats.onMessageProcessingFinished(req);
- mux.notifyAll();
- }
- else {
- ignored = true;
+ clientMsgWrk.nonblockingStop();
- state = spiState;
- }
+ clientMsgWorkers.remove(clientMsgWrk.clientNodeId(), clientMsgWrk);
+
+ return null;
}
+ });
- if (ignored && log.isDebugEnabled())
- log.debug("Check failed message has been ignored [msg=" + msg +
- ", spiState=" + state + ']');
+ return;
+ }
- continue;
+ }
+ }
+ else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+ final TcpDiscoverySpiState state = spiStateCopy();
+
+ if (state == CONNECTED) {
+ clientMsgWrk.addReceipt(RES_OK, new CX1<IgniteInternalFuture<?>, Object>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public Object applyx(
+ final IgniteInternalFuture<?> fut) throws IgniteCheckedException {
+ msgWorker.addMessage(msg);
+
+ clientMsgWrk.markJoinedAndSendPendingMessages();
+
+ return null;
}
- else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
- // Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+ });
+ }
+ else {
+ clientMsgWrk.addReceipt(RES_CONTINUE_JOIN, new CX1<IgniteInternalFuture<?>, Object>() {
+ private static final long serialVersionUID = 0L;
- boolean ignored = false;
+ @Override public Object applyx(
+ final IgniteInternalFuture<?> fut) throws IgniteCheckedException {
+ clientMsgWrk.nonblockingStop();
- TcpDiscoverySpiState state = null;
+ clientMsgWorkers.remove(clientMsgWrk.clientNodeId(), clientMsgWrk);
- synchronized (mux) {
- if (spiState == CONNECTING) {
- joinRes.set(msg);
+ return null;
+ }
+ });
+ }
+ }
+ else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
+ // Send receipt back.
+ clientMsgWrk.addReceipt(RES_OK, createStateChangeClosure(msg, DUPLICATE_ID));
- spiState = LOOPBACK_PROBLEM;
+ return;
+ }
+ else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
+ // Send receipt back.
+ clientMsgWrk.addReceipt(RES_OK, createStateChangeClosure(msg, AUTH_FAILED));
- mux.notifyAll();
- }
- else {
- ignored = true;
+ return;
+ }
+ else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
+ // Send receipt back.
+ clientMsgWrk.addReceipt(RES_OK, createStateChangeClosure(msg, CHECK_FAILED));
- state = spiState;
- }
- }
+ return;
+ }
+ else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
+ // Send receipt back.
+ clientMsgWrk.addReceipt(RES_OK, createStateChangeClosure(msg, LOOPBACK_PROBLEM));
- if (ignored && log.isDebugEnabled())
- log.debug("Loopback problem message has been ignored [msg=" + msg +
- ", spiState=" + state + ']');
+ return;
+ }
+ else if (msg instanceof TcpDiscoveryPingResponse) {
+ assert msg.client() : msg;
- continue;
+ final ClientMessageProcessor clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
+
+ if (clientWorker != null)
+ clientWorker.pingResult(true);
+
+ return;
+ }
+
+ TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null;
+
+ if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
+ heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg;
+ else
+ msgWorker.addMessage(msg);
+
+ final UUID locNodeId = getConfiguredNodeId();
+
+ // Send receipt back.
+ final TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id());
+
+ ack.verify(locNodeId);
+
+ clientMsgWrk.addMessage(ack, null);
+
+ if (heartbeatMsg != null)
+ clientMsgWrk.metrics(heartbeatMsg.metrics());
+ }
+
+ /**
+ * @param msg Discovery message.
+ * @param newState New state.
+ * @return Closure that changes state to new one.
+ */
+ private CX1<IgniteInternalFuture<?>, Object> createStateChangeClosure(final TcpDiscoveryAbstractMessage msg,
+ final TcpDiscoverySpiState newState) {
+ return new CX1<IgniteInternalFuture<?>, Object>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public Object applyx(
+ final IgniteInternalFuture<?> fut) throws IgniteCheckedException {
+ boolean ignored = false;
+
+ TcpDiscoverySpiState state = null;
+
+ synchronized (mux) {
+ if (spiState == CONNECTING) {
+ joinRes.set(msg);
+
+ spiState = newState;
+
+ mux.notifyAll();
}
- if (msg instanceof TcpDiscoveryPingResponse) {
- assert msg.client() : msg;
+ else {
+ ignored = true;
- ClientMessageWorker clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
+ state = spiState;
+ }
+ }
- if (clientWorker != null)
- clientWorker.pingResult(true);
+ if (ignored && log.isDebugEnabled())
+ log.debug("Duplicate ID message has been ignored [msg=" + msg +
+ ", spiState=" + state + ']');
- continue;
+ return null;
+ }
+ };
+ }
+
+ /**
+ * @param ses Session.
+ * @return Client node ID.
+ */
+ private UUID clientNodeId(final GridNioSession ses) {
+ return ses.meta(NODE_ID_META);
+ }
+ }
+
+ /**
+ * Thread that reads messages from the socket created for incoming connections.
+ */
+ private class SocketReader extends IgniteSpiThread {
+ /** Socket to read data from. */
+ private final Socket sock;
+
+ /** */
+ private volatile UUID nodeId;
+
+ /** Flag indicating that client is processed by NIO server. */
+ private volatile boolean nioClient;
+
+ /**
+ * Constructor.
+ *
+ * @param sock Socket to read data from.
+ */
+ SocketReader(Socket sock) {
+ super(spi.ignite().name(), "tcp-disco-sock-reader", log);
+
+ this.sock = sock;
+
+ setPriority(spi.threadPri);
+
+ spi.stats.onSocketReaderCreated();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ UUID locNodeId = getConfiguredNodeId();
+
+ ClientMessageProcessor clientMsgWrk = null;
+
+ try {
+ InputStream in;
+
+ try {
+ // Set socket options.
+ sock.setKeepAlive(true);
+ sock.setTcpNoDelay(spi.getTcpNodelay());
+
+ int timeout = sock.getSoTimeout();
+
+ sock.setSoTimeout((int)spi.netTimeout);
+
+ for (IgniteInClosure<Socket> connLsnr : spi.incomeConnLsnrs)
+ connLsnr.apply(sock);
+
+ int rcvBufSize = sock.getReceiveBufferSize();
+
+ in = new BufferedInputStream(sock.getInputStream(), rcvBufSize > 0 ? rcvBufSize : 8192);
+
+ byte[] buf = new byte[4];
+ int read = 0;
+
+ while (read < buf.length) {
+ int r = in.read(buf, read, buf.length - read);
+
+ if (r >= 0)
+ read += r;
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Failed to read magic header (too few bytes received) " +
+ "[rmtAddr=" + sock.getRemoteSocketAddress() +
+ ", locAddr=" + sock.getLocalSocketAddress() + ']');
+
+ LT.warn(log, "Failed to read magic header (too few bytes received) [rmtAddr=" +
+ sock.getRemoteSocketAddress() + ", locAddr=" + sock.getLocalSocketAddress() + ']');
+
+ return;
}
+ }
- TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null;
+ if (!Arrays.equals(buf, U.IGNITE_HEADER)) {
+ if (log.isDebugEnabled())
+ log.debug("Unknown connection detected (is some other software connecting to " +
+ "this Ignite port?" +
+ (!spi.isSslEnabled() ? " missed SSL configuration?" : "" ) +
+ ") " +
+ "[rmtAddr=" + sock.getRemoteSocketAddress() +
+ ", locAddr=" + sock.getLocalSocketAddress() + ']');
- if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
- heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg;
- else
- msgWorker.addMessage(msg);
+ LT.warn(log, "Unknown connection detected (is some other software connecting to " +
+ "this Ignite port?" +
+ (!spi.isSslEnabled() ? " missing SSL configuration on remote node?" : "" ) +
+ ") [rmtAddr=" + sock.getInetAddress() + ']', true);
- // Send receipt back.
- if (clientMsgWrk != null) {
- TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id());
+ return;
+ }
+
+ // Restore timeout.
+ sock.setSoTimeout(timeout);
+
+ TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
+
+ // Ping.
+ if (msg instanceof TcpDiscoveryPingRequest) {
+ if (!spi.isNodeStopping0()) {
+ TcpDiscoveryPingRequest req = (TcpDiscoveryPingRequest)msg;
+
+ TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
+
+ IgniteSpiOperationTimeoutHelper timeoutHelper =
+ new IgniteSpiOperationTimeoutHelper(spi);
+
+ if (req.clientNodeId() != null) {
+ ClientMessageProcessor clientWorker = clientMsgWorkers.get(req.clientNodeId());
+
+ if (clientWorker != null)
+ res.clientExists(clientWorker.ping(timeoutHelper));
+ }
+
+ spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Ignore ping request, node is stopping.");
+
+ return;
+ }
+
+ // Handshake.
+ TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
+
+ UUID nodeId = req.creatorNodeId();
+
+ this.nodeId = nodeId;
+
+ TcpDiscoveryHandshakeResponse res =
+ new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
+
+ boolean asyncMode = false;
+
+ if (req.client()) {
+ res.clientAck(true);
+
+ // If client proposes async mode accept it.
+ if (req.asyncMode()) {
+ res.asyncMode(true);
+
+ asyncMode = true;
+ }
+ }
+
+ // It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
+ // the local node sends a handshake request message on the loopback address, so we get here.
+ if (locNodeId.equals(nodeId)) {
+ assert !req.client();
+
+ if (log.isDebugEnabled())
+ log.debug("Handshake request from local node: " + req);
+
+ return;
+ }
+
+ if (req.client()) {
+ ClientMessageProcessor clientProc = asyncMode
+ ? new ClientNioMessageWorker(nodeId, sock)
+ : new ClientMessageWorker(sock, nodeId);
+
+ while (true) {
+ ClientMessageProcessor old = clientMsgWorkers.putIfAbsent(nodeId, clientProc);
+
+ if (old == null)
+ break;
+
+ if (old instanceof ClientMessageWorker) {
+ ClientMessageWorker oldWrk = (ClientMessageWorker)old;
+
+ if (oldWrk.isInterrupted()) {
+ clientMsgWorkers.remove(nodeId, old);
+
+ continue;
+ }
+
+ oldWrk.join(500);
+
+ old = clientMsgWorkers.putIfAbsent(nodeId, clientProc);
+
+ if (old == null)
+ break;
+
+ if (log.isDebugEnabled())
+ log.debug("Already have client message worker, closing connection " +
+ "[locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId +
+ ", workerSock=" + oldWrk.sock +
+ ", sock=" + sock + ']');
+
+ return;
+ }
+ else if (old instanceof ClientNioMessageWorker) {
+ final ClientNioMessageWorker nioOldWrk = (ClientNioMessageWorker)old;
+
+ if (nioOldWrk.state() == WorkerState.STOPPED)
+ clientMsgWorkers.remove(nodeId, nioOldWrk);
+ else {
+ // check if old worker is stopping
+ for (int i = 0; i < 5; i++) {
+ U.sleep(100);
+
+ if (nioOldWrk.state() == WorkerState.STOPPED)
+ break;
+ }
+
+ old = clientMsgWorkers.putIfAbsent(nodeId, clientProc);
+
+ if (old == null)
+ break;
+
+ if (log.isDebugEnabled())
+ log.debug("Already have client message worker, closing connection " +
+ "[locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId +
+ ", workerSock=" + nioOldWrk.sock +
+ ", sock=" + sock + ']');
+
+ return;
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Created client message worker [locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
+
+ assert clientProc == clientMsgWorkers.get(nodeId);
+
+ clientMsgWrk = clientProc;
+
+ nioClient = clientMsgWrk instanceof ClientNioMessageWorker;
+ }
+
+ if (nioClient) {
+ final ClientNioMessageWorker nioWrk = (ClientNioMessageWorker)clientMsgWrk;
+
+ nioWrk.start();
+
+ nioWrk.sendMessage(res, null);
+
+ return;
+ }
+
+ spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ?
+ spi.failureDetectionTimeout() : spi.getSocketTimeout());
+
+ if (log.isDebugEnabled())
+ log.debug("Initialized connection with remote node [nodeId=" + nodeId +
+ ", client=" + req.client() + ']');
+
+ if (debugMode) {
+ debugLog(msg, "Initialized connection with remote node [nodeId=" + nodeId +
+ ", client=" + req.client() + ']');
+ }
+ }
+ catch (IOException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
+
+ if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0())
+ LT.warn(log, "Failed to initialize connection " +
+ "(missing SSL configuration on remote node?) " +
+ "[rmtAddr=" + sock.getInetAddress() + ']', true);
+ else if ((X.hasCause(e, ObjectStreamException.class) || !sock.isClosed())
+ && !spi.isNodeStopping0()) {
+ if (U.isMacInvalidArgumentError(e))
+ LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" +
+ U.MAC_INVALID_ARG_MSG);
+ else {
+ U.error(
+ log,
+ "Failed to initialize connection (this can happen due to short time " +
+ "network problems and can be ignored if does not affect node discovery) " +
+ "[sock=" + sock + ']',
+ e);
+ }
+ }
+
+ onException("Caught exception on handshake [err=" + e + ", sock=" + sock + ']', e);
+
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
+
+ onException("Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
+
+ if (e.hasCause(SocketTimeoutException.class))
+ LT.warn(log, "Socket operation timed out on handshake " +
+ "(consider increasing 'networkTimeout' configuration property) " +
+ "[netTimeout=" + spi.netTimeout + ']');
+
+ else if (e.hasCause(ClassNotFoundException.class))
+ LT.warn(log, "Failed to read message due to ClassNotFoundException " +
+ "(make sure same versions of all classes are available on all nodes) " +
+ "[rmtAddr=" + sock.getRemoteSocketAddress() +
+ ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
+
+ // Always report marshalling problems.
+ else if (e.hasCause(ObjectStreamException.class) ||
+ (!sock.isClosed() && !e.hasCause(IOException.class)))
+ LT.error(log, e, "Failed to initialize connection [sock=" + sock + ']');
+
+ return;
+ }
+
+ long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ spi.getSocketTimeout();
+
+ final ClientMessageWorker clientMsgWrk0 = clientMsgWrk == null ? null
+ : (ClientMessageWorker)clientMsgWrk;
+
+ while (!isInterrupted()) {
+ try {
+ TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
+ U.resolveClassLoader(spi.ignite().configuration()));
+
+ msg.senderNodeId(nodeId);
+
- if (log.isDebugEnabled())
- log.debug("Message has been received: " + msg);
++ DebugLogger debugLog = messageLogger(msg);
++
++ if (debugLog.isDebugEnabled())
++ debugLog.debug("Message has been received: " + msg);
+
+ spi.stats.onMessageReceived(msg);
+
+ if (debugMode && recordable(msg))
+ debugLog(msg, "Message has been received: " + msg);
+
+ if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+
+ continue;
+ }
+ else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+ TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
+
+ if (!req.responded()) {
+ boolean ok = processJoinRequestMessage(req, clientMsgWrk0);
+
+ if (clientMsgWrk0 != null && ok)
+ continue;
+ else
+ // Direct join request - no need to handle this socket anymore.
+ break;
+ }
+ }
+ else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+ if (clientMsgWrk0 != null) {
+ TcpDiscoverySpiState state = spiStateCopy();
+
+ if (state == CONNECTED) {
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+
+ if (clientMsgWrk0.getState() == State.NEW)
+ clientMsgWrk0.start();
+
+ msgWorker.addMessage(msg);
+
+ continue;
+ }
+ else {
+ spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
+
+ break;
+ }
+ }
+ }
+ else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
+ // Send receipt back.
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+
+ boolean ignored = false;
+
+ TcpDiscoverySpiState state = null;
+
+ synchronized (mux) {
+ if (spiState == CONNECTING) {
+ joinRes.set(msg);
+
+ spiState = DUPLICATE_ID;
+
+ mux.notifyAll();
+ }
+ else {
+ ignored = true;
+
+ state = spiState;
+ }
+ }
+
+ if (ignored && log.isDebugEnabled())
+ log.debug("Duplicate ID message has been ignored [msg=" + msg +
+ ", spiState=" + state + ']');
+
+ continue;
+ }
+ else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
+ // Send receipt back.
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+
+ boolean ignored = false;
+
+ TcpDiscoverySpiState state = null;
+
+ synchronized (mux) {
+ if (spiState == CONNECTING) {
+ joinRes.set(msg);
+
+ spiState = AUTH_FAILED;
+
+ mux.notifyAll();
+ }
+ else {
+ ignored = true;
+
+ state = spiState;
+ }
+ }
+
+ if (ignored && log.isDebugEnabled())
+ log.debug("Auth failed message has been ignored [msg=" + msg +
+ ", spiState=" + state + ']');
+
+ continue;
+ }
+ else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
+ // Send receipt back.
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+
+ boolean ignored = false;
+
+ TcpDiscoverySpiState state = null;
+
+ synchronized (mux) {
+ if (spiState == CONNECTING) {
+ joinRes.set(msg);
+
+ spiState = CHECK_FAILED;
+
+ mux.notifyAll();
+ }
+ else {
+ ignored = true;
+
+ state = spiState;
+ }
+ }
+
+ if (ignored && log.isDebugEnabled())
+ log.debug("Check failed message has been ignored [msg=" + msg +
+ ", spiState=" + state + ']');
+
+ continue;
+ }
+ else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
+ // Send receipt back.
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+
+ boolean ignored = false;
+
+ TcpDiscoverySpiState state = null;
+
+ synchronized (mux) {
+ if (spiState == CONNECTING) {
+ joinRes.set(msg);
+
+ spiState = LOOPBACK_PROBLEM;
+
+ mux.notifyAll();
+ }
+ else {
+ ignored = true;
+
+ state = spiState;
+ }
+ }
+
+ if (ignored && log.isDebugEnabled())
+ log.debug("Loopback problem message has been ignored [msg=" + msg +
+ ", spiState=" + state + ']');
+
+ continue;
+ }
+ if (msg instanceof TcpDiscoveryPingResponse) {
+ assert msg.client() : msg;
+
+ ClientMessageProcessor clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
+
+ if (clientWorker != null)
+ clientWorker.pingResult(true);
+
+ continue;
+ }
+
+ TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null;
+
+ if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
+ heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg;
+ else
+ msgWorker.addMessage(msg);
+
+ // Send receipt back.
+ if (clientMsgWrk != null) {
+ TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id());
+
+ ack.verify(locNodeId);
+
+ clientMsgWrk.addMessage(ack);
+ }
+ else
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+
+ if (heartbeatMsg != null)
+ processClientHeartbeatMessage(heartbeatMsg);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Caught exception on message read [sock=" + sock +
+ ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e);
+
+ onException("Caught exception on message read [sock=" + sock +
+ ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e);
+
+ if (isInterrupted() || sock.isClosed())
+ return;
+
+ if (e.hasCause(ClassNotFoundException.class))
+ LT.warn(log, "Failed to read message due to ClassNotFoundException " +
+ "(make sure same versions of all classes are available on all nodes) " +
+ "[rmtNodeId=" + nodeId +
+ ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
+
+ // Always report marshalling errors.
+ boolean err = e.hasCause(ObjectStreamException.class) ||
+ (nodeAlive(nodeId) && spiStateCopy() == CONNECTED && !X.hasCause(e, IOException.class));
+
+ if (err)
+ LT.error(log, e, "Failed to read message [sock=" + sock + ", locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId + ']');
+
+ return;
+ }
+ catch (IOException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId + ']', e);
+
+ if (isInterrupted() || sock.isClosed())
+ return;
+
+ // Always report marshalling errors (although it is strange here).
+ boolean err = X.hasCause(e, ObjectStreamException.class) ||
+ (nodeAlive(nodeId) && spiStateCopy() == CONNECTED);
+
+ if (err)
+ LT.error(log, e, "Failed to send receipt on message [sock=" + sock +
+ ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']');
+
+ onException("Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId + ']', e);
+
+ return;
+ }
+ }
+ }
+ finally {
+ if (clientMsgWrk != null) {
+ if (log.isDebugEnabled())
+ log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId +
+ ", rmtNodeId=" + nodeId + ']');
+
+ if (!nioClient) {
+ clientMsgWorkers.remove(nodeId, clientMsgWrk);
+
+ U.interrupt((ClientMessageWorker)clientMsgWrk);
+ }
+ }
+
+ if (!nioClient)
+ U.closeQuiet(sock);
+ }
+ }
+
+ /**
+ * Processes client heartbeat message.
+ *
+ * @param msg Heartbeat message.
+ */
+ private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg) {
+ assert msg.client();
+
+ ClientMessageProcessor wrk = clientMsgWorkers.get(msg.creatorNodeId());
+
+ if (wrk != null)
+ wrk.metrics(msg.metrics());
+ else if (log.isDebugEnabled())
+ log.debug("Received heartbeat message from unknown client node: " + msg);
+ }
+
+ /**
+ * @param msg Join request message.
+ * @param clientMsgWrk Client message worker to start.
+ * @return Whether connection was successful.
+ * @throws IOException If IO failed.
+ */
+ @SuppressWarnings({"IfMayBeConditional"})
+ private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg,
+ @Nullable ClientMessageWorker clientMsgWrk) throws IOException, IgniteCheckedException {
+ assert msg != null;
+ assert !msg.responded();
+
+ TcpDiscoverySpiState state = spiStateCopy();
+
+ long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ spi.getSocketTimeout();
+
+ if (state == CONNECTED) {
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+
+ if (log.isDebugEnabled())
+ log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
+
+ msg.responded(true);
+
+ if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) {
+ clientMsgWrk.clientVersion(U.productVersion(msg.node()));
+
+ clientMsgWrk.start();
+ }
+
+ msgWorker.addMessage(msg);
+
+ return true;
+ }
+ else {
+ spi.stats.onMessageProcessingStarted(msg);
+
+ Integer res;
+
+ SocketAddress rmtAddr = sock.getRemoteSocketAddress();
+
+ if (state == CONNECTING) {
+ if (noResAddrs.contains(rmtAddr) ||
+ getLocalNodeId().compareTo(msg.creatorNodeId()) < 0)
+ // Remote node node has not responded to join request or loses UUID race.
+ res = RES_WAIT;
+ else
+ // Remote node responded to join request and wins UUID race.
+ res = RES_CONTINUE_JOIN;
+ }
+ else
+ // Local node is stopping. Remote node should try next one.
+ res = RES_CONTINUE_JOIN;
+
+ spi.writeToSocket(msg, sock, res, sockTimeout);
+
+ if (log.isDebugEnabled())
+ log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
+
+ fromAddrs.addAll(msg.node().socketAddresses());
+
+ spi.stats.onMessageProcessingFinished(msg);
+
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void interrupt() {
+ super.interrupt();
+
+ U.closeQuiet(sock);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void cleanup() {
+ super.cleanup();
+
+ if (!nioClient)
+ U.closeQuiet(sock);
+
+ synchronized (mux) {
+ readers.remove(this);
+ }
+
+ spi.stats.onSocketReaderRemoved();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "Socket reader [id=" + getId() + ", name=" + getName() + ", nodeId=" + nodeId + ']';
+ }
+ }
+
+ /**
+ * SPI Statistics printer.
+ */
+ private class StatisticsPrinter extends IgniteSpiThread {
+ /**
+ * Constructor.
+ */
+ StatisticsPrinter() {
+ super(spi.ignite().name(), "tcp-disco-stats-printer", log);
+
+ assert spi.statsPrintFreq > 0;
+
+ assert log.isInfoEnabled();
+
+ setPriority(spi.threadPri);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"BusyWait"})
+ @Override protected void body() throws InterruptedException {
+ if (log.isDebugEnabled())
+ log.debug("Statistics printer has been started.");
+
+ while (!isInterrupted()) {
+ Thread.sleep(spi.statsPrintFreq);
+
+ printStatistics();
+ }
+ }
+ }
+
+ /**
+ * Provides communication with client node.
+ */
+ private interface ClientMessageProcessor {
+ /**
+ * @param msg Message.
+ */
+ void addMessage(TcpDiscoveryAbstractMessage msg);
+
+ /**
+ * @param msg Message.
+ * @param msgBytes Marshalled message.
+ */
+ void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes);
+
+ /**
+ * @param timeoutHelper Timeout helper.
+ * @return Success flag.
+ * @throws InterruptedException
+ */
+ boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException;
+
+ /**
+ * @param res Ping result.
+ */
+ void pingResult(boolean res);
+
+ /**
+ * @return Client ID.
+ */
+ UUID clientNodeId();
+
+ /**
+ * @return Cluster metrics.
+ */
+ ClusterMetrics metrics();
+
+ /**
+ * @param metrics Cluster metrics.
+ */
+ void metrics(ClusterMetrics metrics);
+ }
+
+ /**
+ *
+ */
+ private class ClientMessageWorker extends MessageWorkerAdapter<T2<TcpDiscoveryAbstractMessage, byte[]>>
+ implements ClientMessageProcessor {
+ /** Node ID. */
+ private final UUID clientNodeId;
+
+ /** Socket. */
+ private final Socket sock;
+
+ /** Current client metrics. */
+ private volatile ClusterMetrics metrics;
+
+ /** */
+ private IgniteProductVersion clientVer;
+
+ /** */
+ private volatile ClientMessagePinger pinger;
+
+ /**
+ * @param sock Socket.
+ * @param clientNodeId Node ID.
+ */
+ protected ClientMessageWorker(Socket sock, UUID clientNodeId) throws IOException {
+ super("tcp-disco-client-message-worker", 2000);
+
+ this.sock = sock;
+ this.clientNodeId = clientNodeId;
+ }
+
+ /**
+ * @param clientVer Client version.
+ */
+ void clientVersion(IgniteProductVersion clientVer) {
+ this.clientVer = clientVer;
+ }
+
+ /**
+ * @return Current client metrics.
+ */
+ @Override public ClusterMetrics metrics() {
+ return metrics;
+ }
+
+ /**
+ * @param metrics New current client metrics.
+ */
+ @Override public void metrics(ClusterMetrics metrics) {
+ this.metrics = metrics;
+ }
+
+ /**
+ * @param msg Message.
+ */
+ @Override public void addMessage(TcpDiscoveryAbstractMessage msg) {
+ addMessage(msg, null);
+ }
+
+ /**
+ * @param msg Message.
+ * @param msgBytes Optional message bytes.
+ */
+ @Override public void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) {
+ T2 t = new T2<>(msg, msgBytes);
+
+ if (msg.highPriority())
+ queue.addFirst(t);
+ else
+ queue.add(t);
+
++ DebugLogger log = messageLogger(msg);
++
+ if (log.isDebugEnabled())
+ log.debug("Message has been added to client queue: " + msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID clientNodeId() {
+ return clientNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) {
+ boolean success = false;
+
+ TcpDiscoveryAbstractMessage msg = msgT.get1();
+
+ try {
+ assert msg.verified() : msg;
+
+ byte[] msgBytes = msgT.get2();
+
+ if (msgBytes == null)
+ msgBytes = U.marshal(spi.marshaller(), msg);
+
++ DebugLogger msgLog = messageLogger(msg);
++
+ if (msg instanceof TcpDiscoveryClientAckResponse) {
+ if (clientVer == null) {
+ ClusterNode node = spi.getNode(clientNodeId);
+
+ if (node != null)
+ clientVer = IgniteUtils.productVersion(node);
- else if (log.isDebugEnabled())
- log.debug("Skip sending message ack to client, fail to get client node " +
++ else if (msgLog.isDebugEnabled())
++ msgLog.debug("Skip sending message ack to client, fail to get client node " +
+ "[sock=" + sock + ", locNodeId=" + getLocalNodeId() +
+ ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+ }
+
+ if (clientVer != null &&
+ clientVer.compareTo(TcpDiscoveryClientAckResponse.CLIENT_ACK_SINCE_VERSION) >= 0) {
- if (log.isDebugEnabled())
- log.debug("Sending message ack to client [sock=" + sock + ", locNodeId="
++ if (msgLog.isDebugEnabled())
++ msgLog.debug("Sending message ack to client [sock=" + sock + ", locNodeId="
+ + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+
+ spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
+ spi.failureDetectionTimeout() : spi.getSocketTimeout());
+ }
+ }
+ else {
- if (log.isDebugEnabled())
- log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
++ if (msgLog.isDebugEnabled())
++ msgLog.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
+ + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
+
+ assert topologyInitialized(msg) : msg;
+
+ spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
+ spi.failureDetectionTimeout() : spi.getSocketTimeout());
+ }
+
+ success = true;
+ }
+ catch (IgniteCheckedException | IOException e) {
+ if (log.isDebugEnabled())
+ U.error(log, "Client connection failed [sock=" + sock + ", locNodeId="
+ + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']', e);
+
+ onException("Client connection failed [sock=" + sock + ", locNodeId="
+ + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']', e);
+ }
+ finally {
+ if (!success) {
+ clientMsgWorkers.remove(clientNodeId, this);
+
+ U.interrupt(this);
+
+ U.closeQuiet(sock);
+ }
+ }
+ }
+
+ /**
+ * @param msg Message.
+ * @return {@code True} if topology initialized.
+ */
+ private boolean topologyInitialized(TcpDiscoveryAbstractMessage msg) {
+ if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+ TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+ if (clientNodeId.equals(addedMsg.node().id()))
+ return addedMsg.topology() != null;
+ }
+
+ return true;
+ }
+
+ /**
+ * @param res Ping result.
+ */
+ @Override public void pingResult(boolean res) {
+ pinger().pingResult(res);
+ }
+
+ /**
+ * @param timeoutHelper Timeout controller.
+ * @return Ping result.
+ * @throws InterruptedException If interrupted.
+ */
+ @Override public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException {
+ return pinger().ping(timeoutHelper);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void cleanup() {
+ super.cleanup();
+
+ pingResult(false);
+
+ U.closeQuiet(sock);
+ }
+
+ /**
+ * @return Pinger.
+ */
+ private ClientMessagePinger pinger() {
+ if (pinger == null) {
+ synchronized (this) {
+ if (pinger == null)
+ pinger = new ClientMessagePinger(this);
+ }
+ }
+
+ return pinger;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ClientMessageWorker.class, this);
+ }
+ }
+
+ /**
+ * Base class for message workers.
+ */
+ protected abstract class MessageWorkerAdapter<T> extends IgniteSpiThread {
+ /** Message queue. */
+ protected final BlockingDeque<T> queue = new LinkedBlockingDeque<>();
+
+ /** Backed interrupted flag. */
+ private volatile boolean interrupted;
+
+ /** Polling timeout. */
+ private final long pollingTimeout;
+
+ /**
+ * @param name Thread name.
+ * @param pollingTimeout Messages polling timeout.
+ */
+ MessageWorkerAdapter(String name, long pollingTimeout) {
+ super(spi.ignite().name(), name, log);
+
+ this.pollingTimeout = pollingTimeout;
+
+ setPriority(spi.threadPri);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException {
+ if (log.isDebugEnabled())
+ log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']');
+
+ while (!isInterrupted()) {
+ T msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
+
+ if (msg == null)
+ noMessageLoop();
+ else
+ processMessage(msg);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void interrupt() {
+ interrupted = true;
+
+ super.interrupt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isInterrupted() {
+ return interrupted || super.isInterrupted();
+ }
+
+ /**
+ * @return Current queue size.
+ */
+ int queueSize() {
+ return queue.size();
+ }
+
+ /**
+ * @param msg Message.
+ */
+ protected abstract void processMessage(T msg);
+
+ /**
+ * Called when there is no message to process giving ability to perform other activity.
+ */
+ protected void noMessageLoop() {
+ // No-op.
+ }
+ }
+
+ /**
+ *
+ */
+ private static class GridPingFutureAdapter<R> extends GridFutureAdapter<R> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Socket. */
+ private volatile Socket sock;
+
+ /**
+ * Returns socket associated with this ping future.
+ *
+ * @return Socket or {@code null} if no socket associated.
+ */
+ public Socket sock() {
+ return sock;
+ }
+
+ /**
+ * Associates socket with this ping future.
+ *
+ * @param sock Socket.
+ */
+ public void sock(Socket sock) {
+ this.sock = sock;
+ }
+ }
- ack.verify(locNodeId);
+ /**
+ * Socket decorator that overrides {@link Socket#getInputStream()}
+ * and {@link Socket#getOutputStream()} that return custom implementations which
+ * use passed {@link SSLEngine}.
+ */
+ static class NioSslSocket extends Socket {
+ /** Genuine socket. */
+ private final Socket delegate;
- clientMsgWrk.addMessage(ack);
- }
- else
- spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+ /** */
+ private final SocketChannel ch;
- if (heartbeatMsg != null)
-
<TRUNCATED>
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f388e3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f388e3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------