You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/10/11 12:29:22 UTC
ignite git commit: IGNITE-6542 Reliably close SocketChannel in
TcpCommunicationSpi.
Repository: ignite
Updated Branches:
refs/heads/master 8ffa1099e -> b0158fb2a
IGNITE-6542 Reliably close SocketChannel in TcpCommunicationSpi.
Also fix forceClose() in GridTcpNioCommunicationClient which became wrong when migrated from int to bool. - Fixes #2787.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0158fb2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0158fb2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0158fb2
Branch: refs/heads/master
Commit: b0158fb2ab6de20922cc1b19597c5e17dae0b527
Parents: 8ffa109
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Wed Oct 11 15:29:04 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 11 15:29:04 2017 +0300
----------------------------------------------------------------------
.../nio/GridAbstractCommunicationClient.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 378 ++++++++++---------
2 files changed, 192 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 6302d84..ed7e929 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -59,7 +59,7 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati
/** {@inheritDoc} */
@Override public void forceClose() {
- closed.set(false);
+ closed.set(true);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0158fb2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7a54666..a0ee389 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2894,12 +2894,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
try {
- safeHandshake(client,
- null,
- node.id(),
- timeoutHelper.nextTimeoutChunk(connTimeout0),
- null,
- null);
+ safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
}
catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
client.forceClose();
@@ -3063,7 +3058,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
- boolean conn = false;
GridCommunicationClient client = null;
IgniteCheckedException errs = null;
@@ -3079,7 +3073,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
int lastWaitingTimeout = 1;
- while (!conn) { // Reconnection on handshake timeout.
+ while (client == null) { // Reconnection on handshake timeout.
+ boolean needWait = false;
+
try {
SocketChannel ch = SocketChannel.open();
@@ -3111,7 +3107,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
return null;
}
- Long rcvCnt = null;
+ Long rcvCnt;
Map<Integer, Object> meta = new HashMap<>();
@@ -3132,7 +3128,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
Integer handshakeConnIdx = connIdx;
- rcvCnt = safeHandshake(ch,
+ rcvCnt = safeTcpHandshake(ch,
recoveryDesc,
node.id(),
timeoutHelper.nextTimeoutChunk(connTimeout0),
@@ -3140,34 +3136,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
handshakeConnIdx);
if (rcvCnt == ALREADY_CONNECTED) {
- recoveryDesc.release();
-
return null;
}
else if (rcvCnt == NODE_STOPPING) {
- recoveryDesc.release();
-
throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
}
else if (rcvCnt == NEED_WAIT) {
- recoveryDesc.release();
-
- U.closeQuiet(ch);
-
- if (lastWaitingTimeout < 60000)
- lastWaitingTimeout *= 2;
-
- U.sleep(lastWaitingTimeout);
+ needWait = true;
continue;
}
- }
- finally {
- if (recoveryDesc != null && rcvCnt == null)
- recoveryDesc.release();
- }
- try {
meta.put(CONN_IDX_META, connKey);
if (recoveryDesc != null) {
@@ -3179,13 +3158,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get();
client = new GridTcpNioCommunicationClient(connIdx, ses, log);
-
- conn = true;
}
finally {
- if (!conn) {
+ if (client == null) {
+ U.closeQuiet(ch);
+
if (recoveryDesc != null)
recoveryDesc.release();
+
+ if (needWait) {
+ if (lastWaitingTimeout < 60000)
+ lastWaitingTimeout *= 2;
+
+ U.sleep(lastWaitingTimeout);
+ }
}
}
}
@@ -3307,7 +3293,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
- if (conn)
+ if (client != null)
break;
}
@@ -3362,6 +3348,42 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* Performs handshake in timeout-safe way.
*
* @param client Client.
+ * @param rmtNodeId Remote node.
+ * @param timeout Timeout for handshake.
+ * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ private void safeShmemHandshake(
+ GridCommunicationClient client,
+ UUID rmtNodeId,
+ long timeout
+ ) throws IgniteCheckedException {
+ HandshakeTimeoutObject<GridCommunicationClient> obj = new HandshakeTimeoutObject<>(client,
+ U.currentTimeMillis() + timeout);
+
+ addTimeoutObject(obj);
+
+ try {
+ client.doHandshake(new HandshakeClosure(rmtNodeId));
+ }
+ finally {
+ boolean cancelled = obj.cancel();
+
+ if (cancelled)
+ removeTimeoutObject(obj);
+
+ // Ignoring whatever happened after timeout - reporting only timeout event.
+ if (!cancelled)
+ throw new HandshakeTimeoutException(
+ new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
+ "(consider increasing 'connectionTimeout' configuration property)."));
+ }
+ }
+
+ /**
+ * Performs handshake in timeout-safe way.
+ *
+ * @param ch Socket channel.
* @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
@@ -3371,233 +3393,215 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @return Handshake response.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
- private <T> long safeHandshake(
- T client,
+ private long safeTcpHandshake(
+ SocketChannel ch,
@Nullable GridNioRecoveryDescriptor recovery,
UUID rmtNodeId,
long timeout,
GridSslMeta sslMeta,
@Nullable Integer handshakeConnIdx
) throws IgniteCheckedException {
- HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
+ HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(ch, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
long rcvCnt = 0;
try {
- if (client instanceof GridCommunicationClient)
- ((GridCommunicationClient)client).doHandshake(new HandshakeClosure(rmtNodeId));
- else {
- SocketChannel ch = (SocketChannel)client;
+ BlockingSslHandler sslHnd = null;
- boolean success = false;
+ ByteBuffer buf;
- try {
- BlockingSslHandler sslHnd = null;
+ if (isSslEnabled()) {
+ assert sslMeta != null;
- ByteBuffer buf;
+ sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
- if (isSslEnabled()) {
- assert sslMeta != null;
+ if (!sslHnd.handshake())
+ throw new HandshakeException("SSL handshake is not completed.");
- sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf, ByteOrder.nativeOrder(), log);
+ ByteBuffer handBuff = sslHnd.applicationBuffer();
- if (!sslHnd.handshake())
- throw new HandshakeException("SSL handshake is not completed.");
+ if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
+ buf = ByteBuffer.allocate(1000);
- ByteBuffer handBuff = sslHnd.applicationBuffer();
+ int read = ch.read(buf);
- if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
- buf = ByteBuffer.allocate(1000);
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node ID (connection closed).");
- int read = ch.read(buf);
+ buf.flip();
- if (read == -1)
- throw new HandshakeException("Failed to read remote node ID (connection closed).");
-
- buf.flip();
-
- buf = sslHnd.decode(buf);
- }
- else
- buf = handBuff;
- }
- else {
- buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
+ buf = sslHnd.decode(buf);
+ }
+ else
+ buf = handBuff;
+ }
+ else {
+ buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
- for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new HandshakeException("Failed to read remote node ID (connection closed).");
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node ID (connection closed).");
- i += read;
- }
- }
+ i += read;
+ }
+ }
- UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
+ UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
- if (!rmtNodeId.equals(rmtNodeId0))
- throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
- ", rcvd=" + rmtNodeId0 + ']');
- else if (log.isDebugEnabled())
- log.debug("Received remote node ID: " + rmtNodeId0);
+ if (!rmtNodeId.equals(rmtNodeId0))
+ throw new HandshakeException("Remote node ID is not as expected [expected=" + rmtNodeId +
+ ", rcvd=" + rmtNodeId0 + ']');
+ else if (log.isDebugEnabled())
+ log.debug("Received remote node ID: " + rmtNodeId0);
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
- }
- else
- ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+ ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
+ }
+ else
+ ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
- ClusterNode locNode = getLocalNode();
+ ClusterNode locNode = getLocalNode();
- if (locNode == null)
- throw new IgniteCheckedException("Local node has not been started or " +
- "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
+ if (locNode == null)
+ throw new IgniteCheckedException("Local node has not been started or " +
+ "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
- if (recovery != null) {
- HandshakeMessage msg;
+ if (recovery != null) {
+ HandshakeMessage msg;
- int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
+ int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
- if (handshakeConnIdx != null) {
- msg = new HandshakeMessage2(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received(),
- handshakeConnIdx);
+ if (handshakeConnIdx != null) {
+ msg = new HandshakeMessage2(locNode.id(),
+ recovery.incrementConnectCount(),
+ recovery.received(),
+ handshakeConnIdx);
- msgSize += 4;
- }
- else {
- msg = new HandshakeMessage(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received());
- }
+ msgSize += 4;
+ }
+ else {
+ msg = new HandshakeMessage(locNode.id(),
+ recovery.incrementConnectCount(),
+ recovery.received());
+ }
- if (log.isDebugEnabled())
- log.debug("Writing handshake message [locNodeId=" + locNode.id() +
- ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+ if (log.isDebugEnabled())
+ log.debug("Writing handshake message [locNodeId=" + locNode.id() +
+ ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
- buf = ByteBuffer.allocate(msgSize);
+ buf = ByteBuffer.allocate(msgSize);
- buf.order(ByteOrder.nativeOrder());
+ buf.order(ByteOrder.nativeOrder());
- boolean written = msg.writeTo(buf, null);
+ boolean written = msg.writeTo(buf, null);
- assert written;
+ assert written;
- buf.flip();
+ buf.flip();
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- ch.write(sslHnd.encrypt(buf));
- }
- else
- ch.write(buf);
- }
- else {
- if (isSslEnabled()) {
- assert sslHnd != null;
+ ch.write(sslHnd.encrypt(buf));
+ }
+ else
+ ch.write(buf);
+ }
+ else {
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
- }
- else
- ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
- }
+ ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
+ }
+ else
+ ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
+ }
- if (recovery != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+ if (recovery != null) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- buf = ByteBuffer.allocate(1000);
- buf.order(ByteOrder.nativeOrder());
+ buf = ByteBuffer.allocate(1000);
+ buf.order(ByteOrder.nativeOrder());
- ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
- decode.order(ByteOrder.nativeOrder());
+ ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
+ decode.order(ByteOrder.nativeOrder());
- for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new HandshakeException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
- buf.flip();
+ buf.flip();
- ByteBuffer decode0 = sslHnd.decode(buf);
+ ByteBuffer decode0 = sslHnd.decode(buf);
- i += decode0.remaining();
+ i += decode0.remaining();
- decode = appendAndResizeIfNeeded(decode, decode0);
+ decode = appendAndResizeIfNeeded(decode, decode0);
- buf.clear();
- }
+ buf.clear();
+ }
- decode.flip();
+ decode.flip();
- rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
+ rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
- if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
- decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+ if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
+ decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
- sslMeta.decodedBuffer(decode);
- }
+ sslMeta.decodedBuffer(decode);
+ }
- ByteBuffer inBuf = sslHnd.inputBuffer();
+ ByteBuffer inBuf = sslHnd.inputBuffer();
- if (inBuf.position() > 0)
- sslMeta.encodedBuffer(inBuf);
- }
- else {
- buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+ if (inBuf.position() > 0)
+ sslMeta.encodedBuffer(inBuf);
+ }
+ else {
+ buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
- buf.order(ByteOrder.nativeOrder());
+ buf.order(ByteOrder.nativeOrder());
- for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- if (read == -1)
- throw new HandshakeException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
- i += read;
- }
+ i += read;
+ }
- rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
- }
+ rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
+ }
- if (log.isDebugEnabled())
- log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+ if (log.isDebugEnabled())
+ log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
- if (rcvCnt == -1) {
- if (log.isDebugEnabled())
- log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
- }
- else
- success = true;
- }
- else
- success = true;
- }
- catch (IOException e) {
+ if (rcvCnt == -1) {
if (log.isDebugEnabled())
- log.debug("Failed to read from channel: " + e);
-
- throw new IgniteCheckedException("Failed to read from channel.", e);
- }
- finally {
- if (!success)
- U.closeQuiet(ch);
+ log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
}
}
}
+ catch (IOException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to read from channel: " + e);
+
+ throw new IgniteCheckedException("Failed to read from channel.", e);
+ }
finally {
boolean cancelled = obj.cancel();