You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/10/11 12:29:22 UTC

ignite git commit: IGNITE-6542 Reliably close SocketChannel in TcpCommunicationSpi.

Repository: ignite
Updated Branches:
  refs/heads/master 8ffa1099e -> b0158fb2a


IGNITE-6542 Reliably close SocketChannel in TcpCommunicationSpi.

Also fix forceClose() in GridTcpNioCommunicationClient which became wrong when migrated from int to bool. - Fixes #2787.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0158fb2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0158fb2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0158fb2

Branch: refs/heads/master
Commit: b0158fb2ab6de20922cc1b19597c5e17dae0b527
Parents: 8ffa109
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Wed Oct 11 15:29:04 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 11 15:29:04 2017 +0300

----------------------------------------------------------------------
 .../nio/GridAbstractCommunicationClient.java    |   2 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 378 ++++++++++---------
 2 files changed, 192 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 6302d84..ed7e929 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -59,7 +59,7 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati
 
     /** {@inheritDoc} */
     @Override public void forceClose() {
-        closed.set(false);
+        closed.set(true);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7a54666..a0ee389 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2894,12 +2894,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             }
 
             try {
-                safeHandshake(client,
-                    null,
-                    node.id(),
-                    timeoutHelper.nextTimeoutChunk(connTimeout0),
-                    null,
-                    null);
+                safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
             }
             catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
                 client.forceClose();
@@ -3063,7 +3058,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
         LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
 
-        boolean conn = false;
         GridCommunicationClient client = null;
         IgniteCheckedException errs = null;
 
@@ -3079,7 +3073,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
             int lastWaitingTimeout = 1;
 
-            while (!conn) { // Reconnection on handshake timeout.
+            while (client == null) { // Reconnection on handshake timeout.
+                boolean needWait = false;
+
                 try {
                     SocketChannel ch = SocketChannel.open();
 
@@ -3111,7 +3107,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         return null;
                     }
 
-                    Long rcvCnt = null;
+                    Long rcvCnt;
 
                     Map<Integer, Object> meta = new HashMap<>();
 
@@ -3132,7 +3128,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
                         Integer handshakeConnIdx = connIdx;
 
-                        rcvCnt = safeHandshake(ch,
+                        rcvCnt = safeTcpHandshake(ch,
                             recoveryDesc,
                             node.id(),
                             timeoutHelper.nextTimeoutChunk(connTimeout0),
@@ -3140,34 +3136,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             handshakeConnIdx);
 
                         if (rcvCnt == ALREADY_CONNECTED) {
-                            recoveryDesc.release();
-
                             return null;
                         }
                         else if (rcvCnt == NODE_STOPPING) {
-                            recoveryDesc.release();
-
                             throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
                         }
                         else if (rcvCnt == NEED_WAIT) {
-                            recoveryDesc.release();
-
-                            U.closeQuiet(ch);
-
-                            if (lastWaitingTimeout < 60000)
-                                lastWaitingTimeout *= 2;
-
-                            U.sleep(lastWaitingTimeout);
+                            needWait = true;
 
                             continue;
                         }
-                    }
-                    finally {
-                        if (recoveryDesc != null && rcvCnt == null)
-                            recoveryDesc.release();
-                    }
 
-                    try {
                         meta.put(CONN_IDX_META, connKey);
 
                         if (recoveryDesc != null) {
@@ -3179,13 +3158,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get();
 
                         client = new GridTcpNioCommunicationClient(connIdx, ses, log);
-
-                        conn = true;
                     }
                     finally {
-                        if (!conn) {
+                        if (client == null) {
+                            U.closeQuiet(ch);
+
                             if (recoveryDesc != null)
                                 recoveryDesc.release();
+
+                            if (needWait) {
+                                if (lastWaitingTimeout < 60000)
+                                    lastWaitingTimeout *= 2;
+
+                                U.sleep(lastWaitingTimeout);
+                            }
                         }
                     }
                 }
@@ -3307,7 +3293,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 }
             }
 
-            if (conn)
+            if (client != null)
                 break;
         }
 
@@ -3362,6 +3348,42 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * Performs handshake in timeout-safe way.
      *
      * @param client Client.
+     * @param rmtNodeId Remote node.
+     * @param timeout Timeout for handshake.
+     * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+     */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    private void safeShmemHandshake(
+        GridCommunicationClient client,
+        UUID rmtNodeId,
+        long timeout
+    ) throws IgniteCheckedException {
+        HandshakeTimeoutObject<GridCommunicationClient> obj = new HandshakeTimeoutObject<>(client,
+            U.currentTimeMillis() + timeout);
+
+        addTimeoutObject(obj);
+
+        try {
+            client.doHandshake(new HandshakeClosure(rmtNodeId));
+        }
+        finally {
+            boolean cancelled = obj.cancel();
+
+            if (cancelled)
+                removeTimeoutObject(obj);
+
+            // Ignoring whatever happened after timeout - reporting only timeout event.
+            if (!cancelled)
+                throw new HandshakeTimeoutException(
+                    new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
+                        "(consider increasing 'connectionTimeout' configuration property)."));
+        }
+    }
+
+    /**
+     * Performs handshake in timeout-safe way.
+     *
+     * @param ch Socket channel.
      * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
      * @param rmtNodeId Remote node.
      * @param timeout Timeout for handshake.
@@ -3371,233 +3393,215 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * @return Handshake response.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    private <T> long safeHandshake(
-        T client,
+    private long safeTcpHandshake(
+        SocketChannel ch,
         @Nullable GridNioRecoveryDescriptor recovery,
         UUID rmtNodeId,
         long timeout,
         GridSslMeta sslMeta,
         @Nullable Integer handshakeConnIdx
     ) throws IgniteCheckedException {
-        HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
+        HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(ch, U.currentTimeMillis() + timeout);
 
         addTimeoutObject(obj);
 
         long rcvCnt = 0;
 
         try {
-            if (client instanceof GridCommunicationClient)
-                ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
-            else {
-                SocketChannel ch = (SocketChannel)client;
+            BlockingSslHandler sslHnd = null;
 
-                boolean success = false;
+            ByteBuffer buf;
 
-                try {
-                    BlockingSslHandler sslHnd = null;
+            if (isSslEnabled()) {
+                assert sslMeta != null;
 
-                    ByteBuffer buf;
+                sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
 
-                    if (isSslEnabled()) {
-                        assert sslMeta != null;
+                if (!sslHnd.handshake())
+                    throw new HandshakeException("SSL handshake is not completed.");
 
-                        sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
+                ByteBuffer handBuff = sslHnd.applicationBuffer();
 
-                        if (!sslHnd.handshake())
-                            throw new HandshakeException("SSL handshake is not completed.");
+                if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
+                    buf = ByteBuffer.allocate(1000);
 
-                        ByteBuffer handBuff = sslHnd.applicationBuffer();
+                    int read = ch.read(buf);
 
-                        if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
-                            buf = ByteBuffer.allocate(1000);
+                    if (read == -1)
+                        throw new HandshakeException("Failed to read remote node ID (connection closed).");
 
-                            int read = ch.read(buf);
+                    buf.flip();
 
-                            if (read == -1)
-                                throw new HandshakeException("Failed to read remote node ID (connection closed).");
-
-                            buf.flip();
-
-                            buf = sslHnd.decode(buf);
-                        }
-                        else
-                            buf = handBuff;
-                    }
-                    else {
-                        buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
+                    buf = sslHnd.decode(buf);
+                }
+                else
+                    buf = handBuff;
+            }
+            else {
+                buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
 
-                        for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
-                            int read = ch.read(buf);
+                for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
+                    int read = ch.read(buf);
 
-                            if (read == -1)
-                                throw new HandshakeException("Failed to read remote node ID (connection closed).");
+                    if (read == -1)
+                        throw new HandshakeException("Failed to read remote node ID (connection closed).");
 
-                            i += read;
-                        }
-                    }
+                    i += read;
+                }
+            }
 
-                    UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
+            UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
 
-                    if (!rmtNodeId.equals(rmtNodeId0))
-                        throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
-                            ", rcvd=" + rmtNodeId0 + ']');
-                    else if (log.isDebugEnabled())
-                        log.debug("Received remote node ID: " + rmtNodeId0);
+            if (!rmtNodeId.equals(rmtNodeId0))
+                throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
+                    ", rcvd=" + rmtNodeId0 + ']');
+            else if (log.isDebugEnabled())
+                log.debug("Received remote node ID: " + rmtNodeId0);
 
-                    if (isSslEnabled()) {
-                        assert sslHnd != null;
+            if (isSslEnabled()) {
+                assert sslHnd != null;
 
-                        ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
-                    }
-                    else
-                        ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+                ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
+            }
+            else
+                ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
-                    ClusterNode locNode = getLocalNode();
+            ClusterNode locNode = getLocalNode();
 
-                    if (locNode == null)
-                        throw new IgniteCheckedException("Local node has not been started or " +
-                            "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+            if (locNode == null)
+                throw new IgniteCheckedException("Local node has not been started or " +
+                    "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
 
-                    if (recovery != null) {
-                        HandshakeMessage msg;
+            if (recovery != null) {
+                HandshakeMessage msg;
 
-                        int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
+                int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
 
-                        if (handshakeConnIdx != null) {
-                            msg = new HandshakeMessage2(locNode.id(),
-                                recovery.incrementConnectCount(),
-                                recovery.received(),
-                                handshakeConnIdx);
+                if (handshakeConnIdx != null) {
+                    msg = new HandshakeMessage2(locNode.id(),
+                        recovery.incrementConnectCount(),
+                        recovery.received(),
+                        handshakeConnIdx);
 
-                            msgSize += 4;
-                        }
-                        else {
-                            msg = new HandshakeMessage(locNode.id(),
-                                recovery.incrementConnectCount(),
-                                recovery.received());
-                        }
+                    msgSize += 4;
+                }
+                else {
+                    msg = new HandshakeMessage(locNode.id(),
+                        recovery.incrementConnectCount(),
+                        recovery.received());
+                }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Writing handshake message [locNodeId=" + locNode.id() +
-                                ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Writing handshake message [locNodeId=" + locNode.id() +
+                        ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
 
-                        buf = ByteBuffer.allocate(msgSize);
+                buf = ByteBuffer.allocate(msgSize);
 
-                        buf.order(ByteOrder.nativeOrder());
+                buf.order(ByteOrder.nativeOrder());
 
-                        boolean written = msg.writeTo(buf, null);
+                boolean written = msg.writeTo(buf, null);
 
-                        assert written;
+                assert written;
 
-                        buf.flip();
+                buf.flip();
 
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+                if (isSslEnabled()) {
+                    assert sslHnd != null;
 
-                            ch.write(sslHnd.encrypt(buf));
-                        }
-                        else
-                            ch.write(buf);
-                    }
-                    else {
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+                    ch.write(sslHnd.encrypt(buf));
+                }
+                else
+                    ch.write(buf);
+            }
+            else {
+                if (isSslEnabled()) {
+                    assert sslHnd != null;
 
-                            ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
-                        }
-                        else
-                            ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
-                    }
+                    ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
+                }
+                else
+                    ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
+            }
 
-                    if (recovery != null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+            if (recovery != null) {
+                if (log.isDebugEnabled())
+                    log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
 
-                        if (isSslEnabled()) {
-                            assert sslHnd != null;
+                if (isSslEnabled()) {
+                    assert sslHnd != null;
 
-                            buf = ByteBuffer.allocate(1000);
-                            buf.order(ByteOrder.nativeOrder());
+                    buf = ByteBuffer.allocate(1000);
+                    buf.order(ByteOrder.nativeOrder());
 
-                            ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
-                            decode.order(ByteOrder.nativeOrder());
+                    ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
+                    decode.order(ByteOrder.nativeOrder());
 
-                            for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                                int read = ch.read(buf);
+                    for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+                        int read = ch.read(buf);
 
-                                if (read == -1)
-                                    throw new HandshakeException("Failed to read remote node recovery handshake " +
-                                        "(connection closed).");
+                        if (read == -1)
+                            throw new HandshakeException("Failed to read remote node recovery handshake " +
+                                "(connection closed).");
 
-                                buf.flip();
+                        buf.flip();
 
-                                ByteBuffer decode0 = sslHnd.decode(buf);
+                        ByteBuffer decode0 = sslHnd.decode(buf);
 
-                                i += decode0.remaining();
+                        i += decode0.remaining();
 
-                                decode = appendAndResizeIfNeeded(decode, decode0);
+                        decode = appendAndResizeIfNeeded(decode, decode0);
 
-                                buf.clear();
-                            }
+                        buf.clear();
+                    }
 
-                            decode.flip();
+                    decode.flip();
 
-                            rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
+                    rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
 
-                            if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
-                                decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+                    if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
+                        decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
 
-                                sslMeta.decodedBuffer(decode);
-                            }
+                        sslMeta.decodedBuffer(decode);
+                    }
 
-                            ByteBuffer inBuf = sslHnd.inputBuffer();
+                    ByteBuffer inBuf = sslHnd.inputBuffer();
 
-                            if (inBuf.position() > 0)
-                                sslMeta.encodedBuffer(inBuf);
-                        }
-                        else {
-                            buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+                    if (inBuf.position() > 0)
+                        sslMeta.encodedBuffer(inBuf);
+                }
+                else {
+                    buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
 
-                            buf.order(ByteOrder.nativeOrder());
+                    buf.order(ByteOrder.nativeOrder());
 
-                            for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
-                                int read = ch.read(buf);
+                    for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+                        int read = ch.read(buf);
 
-                                if (read == -1)
-                                    throw new HandshakeException("Failed to read remote node recovery handshake " +
-                                        "(connection closed).");
+                        if (read == -1)
+                            throw new HandshakeException("Failed to read remote node recovery handshake " +
+                                "(connection closed).");
 
-                                i += read;
-                            }
+                        i += read;
+                    }
 
-                            rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
-                        }
+                    rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
+                }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+                if (log.isDebugEnabled())
+                    log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
 
-                        if (rcvCnt == -1) {
-                            if (log.isDebugEnabled())
-                                log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
-                        }
-                        else
-                            success = true;
-                    }
-                    else
-                        success = true;
-                }
-                catch (IOException e) {
+                if (rcvCnt == -1) {
                     if (log.isDebugEnabled())
-                        log.debug("Failed to read from channel: " + e);
-
-                    throw new IgniteCheckedException("Failed to read from channel.", e);
-                }
-                finally {
-                    if (!success)
-                        U.closeQuiet(ch);
+                        log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
                 }
             }
         }
+        catch (IOException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to read from channel: " + e);
+
+            throw new IgniteCheckedException("Failed to read from channel.", e);
+        }
         finally {
             boolean cancelled = obj.cancel();