You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2019/02/14 13:53:48 UTC
[ignite] 01/02: IGNITE-10748 Remove dead code in
TcpCommunicationSpi for the tcp client creation flow. - Fixes #5711.
This is an automated email from the ASF dual-hosted git repository.
dpavlov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 68c178ef111bb02fb62b7c969c704c58d272cf04
Author: Maxim Muzafarov <ma...@gmail.com>
AuthorDate: Thu Feb 14 16:51:13 2019 +0300
IGNITE-10748 Remove dead code in TcpCommunicationSpi for the tcp client creation flow. - Fixes #5711.
Signed-off-by: Dmitriy Pavlov <dp...@apache.org>
---
.../spi/communication/tcp/TcpCommunicationSpi.java | 276 ++++++++++-----------
.../tcp/internal/HandshakeException.java | 35 +++
.../tcp/messages/HandshakeMessage.java | 7 +
.../tcp/messages/HandshakeMessage2.java | 8 +
.../zk/internal/ZookeeperDiscoverySpiTestBase.java | 2 +-
5 files changed, 185 insertions(+), 143 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 243f707..de029f7 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -23,6 +23,7 @@ import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -143,6 +144,7 @@ import org.apache.ignite.spi.TimeoutStrategy;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
+import org.apache.ignite.spi.communication.tcp.internal.HandshakeException;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
@@ -2915,7 +2917,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
curClients0[connIdx] : null;
if (client0 == null) {
- client0 = createNioClient(node, connIdx);
+ client0 = createCommunicationClient(node, connIdx);
if (client0 != null) {
addNodeClient(node, connIdx, client0);
@@ -2989,7 +2991,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @return Client.
* @throws IgniteCheckedException If failed.
*/
- @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx)
+ @Nullable private GridCommunicationClient createCommunicationClient(ClusterNode node, int connIdx)
throws IgniteCheckedException {
assert node != null;
@@ -3275,9 +3277,45 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @throws IgniteCheckedException If failed.
*/
protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+ GridNioSession session = createNioSession(node, connIdx);
+
+ return session == null ?
+ null : new GridTcpNioCommunicationClient(connIdx, session, log);
+ }
+
+ /**
+ * Returns the established TCP/IP connection between the current node and remote server. A handshake process of
+ * negotiation between two communicating nodes will be performed before the {@link GridNioSession} created.
+ * <p>
+ * The handshaking process contains of these steps:
+ *
+ * <ol>
+ * <li>The local node opens a new {@link SocketChannel} in the <em>blocking</em> mode.</li>
+ * <li>The local node calls {@link SocketChannel#connect(SocketAddress)} to remote node.</li>
+ * <li>The remote GridNioAcceptWorker thread accepts new connection.</li>
+ * <li>The remote node sends back the {@link NodeIdMessage}.</li>
+ * <li>The local node reads NodeIdMessage from created channel.</li>
+ * <li>The local node sends the {@link HandshakeMessage2} to remote.</li>
+ * <li>The remote node processes {@link HandshakeMessage2} in {@link GridNioServerListener#onMessage(GridNioSession, Object)}.</li>
+ * <li>The remote node sends back the {@link RecoveryLastReceivedMessage}.</li>
+ * </ol>
+ *
+ * The handshaking process ends.
+ * </p>
+ * <p>
+ * <em>Note.</em> The {@link HandshakeTimeoutObject} is created to control execution timeout during the
+ * whole handshaking process.
+ * </p>
+ *
+ * @param node Remote node identifier to connect with.
+ * @param connIdx Connection index based on configured {@link ConnectionPolicy}.
+ * @return A {@link GridNioSession} connection representation.
+ * @throws IgniteCheckedException If establish connection fails.
+ */
+ private GridNioSession createNioSession(ClusterNode node, int connIdx) throws IgniteCheckedException {
Collection<InetSocketAddress> addrs = nodeAddresses(node);
- GridCommunicationClient client = null;
+ GridNioSession session = null;
IgniteCheckedException errs = null;
long totalTimeout;
@@ -3299,7 +3337,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
maxConnTimeout
);
- while (client == null) { // Reconnection on handshake timeout.
+ while (session == null) { // Reconnection on handshake timeout.
if (stopping)
throw new IgniteSpiException("Node is stopping.");
@@ -3332,6 +3370,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
+ assert recoveryDesc != null :
+ "Recovery descriptor not found [connKey=" + connKey + ", rmtNode=" + node.id() + ']';
+
if (!recoveryDesc.reserve()) {
U.closeQuiet(ch);
@@ -3370,14 +3411,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
sslMeta.sslEngine(sslEngine);
}
- Integer handshakeConnIdx = connIdx;
+ ClusterNode locNode = getLocalNode();
+
+ if (locNode == null)
+ throw new IgniteCheckedException("Local node has not been started or " +
+ "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
rcvCnt = safeTcpHandshake(ch,
- recoveryDesc,
node.id(),
connTimeoutStgy.nextTimeout(currTimeout),
sslMeta,
- handshakeConnIdx);
+ new HandshakeMessage2(locNode.id(),
+ recoveryDesc.incrementConnectCount(),
+ recoveryDesc.received(),
+ connIdx));
if (rcvCnt == ALREADY_CONNECTED)
return null;
@@ -3416,20 +3463,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
throw new IgniteCheckedException("Unsupported negative receivedCount [rcvCnt=" + rcvCnt +
", senderNode=" + node + ']');
- meta.put(CONN_IDX_META, connKey);
-
- if (recoveryDesc != null) {
- recoveryDesc.onHandshake(rcvCnt);
-
- meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc);
- }
+ recoveryDesc.onHandshake(rcvCnt);
- GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get();
+ meta.put(CONN_IDX_META, connKey);
+ meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc);
- client = new GridTcpNioCommunicationClient(connIdx, ses, log);
+ session = nioSrvr.createSession(ch, meta, false, null).get();
}
finally {
- if (client == null) {
+ if (session == null) {
U.closeQuiet(ch);
if (recoveryDesc != null)
@@ -3438,10 +3480,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
catch (IgniteSpiOperationTimeoutException e) { // Handshake is timed out.
- if (client != null) {
- client.forceClose();
+ if (session != null) {
+ session.close();
- client = null;
+ session = null;
}
onException("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy +
@@ -3477,10 +3519,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
catch (Exception e) {
// Most probably IO error on socket connect or handshake.
- if (client != null) {
- client.forceClose();
+ if (session != null) {
+ session.close();
- client = null;
+ session = null;
}
onException("Client creation failed [addr=" + addr + ", err=" + e + ']', e);
@@ -3527,14 +3569,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
- if (client != null)
+ if (session != null)
break;
}
- if (client == null)
- processClientCreationError(node, addrs, errs == null ? new IgniteCheckedException("No clients found") : errs);
+ if (session == null)
+ processSessionCreationError(node, addrs, errs == null ? new IgniteCheckedException("No session found") : errs);
- return client;
+ return session;
}
/**
@@ -3552,14 +3594,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
/**
- * Process errors if TCP client to remote node hasn't been created.
+ * Process errors if TCP/IP {@link GridNioSession} creation to remote node hasn't been performed.
*
* @param node Remote node.
* @param addrs Remote node addresses.
* @param errs TCP client creation errors.
* @throws IgniteCheckedException If failed.
*/
- protected void processClientCreationError(
+ protected void processSessionCreationError(
ClusterNode node,
Collection<InetSocketAddress> addrs,
IgniteCheckedException errs
@@ -3654,34 +3696,33 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* Performs handshake in timeout-safe way.
*
* @param ch Socket channel.
- * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
* @param rmtNodeId Remote node.
* @param timeout Timeout for handshake.
* @param sslMeta Session meta.
- * @param handshakeConnIdx Non null connection index if need send it in handshake.
+ * @param msg {@link HandshakeMessage} or {@link HandshakeMessage2} to send.
* @return Handshake response.
* @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
private long safeTcpHandshake(
SocketChannel ch,
- @Nullable GridNioRecoveryDescriptor recovery,
UUID rmtNodeId,
long timeout,
GridSslMeta sslMeta,
- @Nullable Integer handshakeConnIdx
+ HandshakeMessage msg
) throws IgniteCheckedException {
HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(ch, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
- long rcvCnt = 0;
+ long rcvCnt;
try {
BlockingSslHandler sslHnd = null;
ByteBuffer buf;
+ // Step 1. Get remote node response with the remote nodeId value.
if (isSslEnabled()) {
assert sslMeta != null;
@@ -3757,134 +3798,98 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
else
U.writeFully(ch, ByteBuffer.wrap(U.IGNITE_HEADER));
- ClusterNode locNode = getLocalNode();
-
- if (locNode == null)
- throw new IgniteCheckedException("Local node has not been started or " +
- "fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
-
- if (recovery != null) {
- HandshakeMessage msg;
-
- int msgSize = HandshakeMessage.MESSAGE_FULL_SIZE;
-
- if (handshakeConnIdx != null) {
- msg = new HandshakeMessage2(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received(),
- handshakeConnIdx);
-
- msgSize += 4;
- }
- else {
- msg = new HandshakeMessage(locNode.id(),
- recovery.incrementConnectCount(),
- recovery.received());
- }
-
- if (log.isDebugEnabled())
- log.debug("Writing handshake message [locNodeId=" + locNode.id() +
- ", rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
+ // Step 2. Prepare Handshake message to send to the remote node.
+ if (log.isDebugEnabled())
+ log.debug("Writing handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
- buf = ByteBuffer.allocate(msgSize);
+ buf = ByteBuffer.allocate(msg.getMessageSize());
- buf.order(ByteOrder.nativeOrder());
+ buf.order(ByteOrder.nativeOrder());
- boolean written = msg.writeTo(buf, null);
+ boolean written = msg.writeTo(buf, null);
- assert written;
+ assert written;
- buf.flip();
+ buf.flip();
- if (isSslEnabled()) {
- assert sslHnd != null;
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- U.writeFully(ch, sslHnd.encrypt(buf));
- }
- else
- U.writeFully(ch, buf);
+ U.writeFully(ch, sslHnd.encrypt(buf));
}
- else {
- if (isSslEnabled()) {
- assert sslHnd != null;
+ else
+ U.writeFully(ch, buf);
- U.writeFully(ch, sslHnd.encrypt(ByteBuffer.wrap(NodeIdMessage.nodeIdBytesWithType(safeLocalNodeId()))));
- }
- else
- U.writeFully(ch, ByteBuffer.wrap(NodeIdMessage.nodeIdBytesWithType(safeLocalNodeId())));
- }
+ if (log.isDebugEnabled())
+ log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
- if (recovery != null) {
- if (log.isDebugEnabled())
- log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+ // Step 3. Waiting for response from the remote node with their receive count message.
+ if (isSslEnabled()) {
+ assert sslHnd != null;
- if (isSslEnabled()) {
- assert sslHnd != null;
+ buf = ByteBuffer.allocate(1000);
+ buf.order(ByteOrder.nativeOrder());
- buf = ByteBuffer.allocate(1000);
- buf.order(ByteOrder.nativeOrder());
+ ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
+ decode.order(ByteOrder.nativeOrder());
- ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
- decode.order(ByteOrder.nativeOrder());
+ for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
- if (read == -1)
- throw new HandshakeException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ buf.flip();
- buf.flip();
+ ByteBuffer decode0 = sslHnd.decode(buf);
- ByteBuffer decode0 = sslHnd.decode(buf);
+ i += decode0.remaining();
- i += decode0.remaining();
+ decode = appendAndResizeIfNeeded(decode, decode0);
- decode = appendAndResizeIfNeeded(decode, decode0);
+ buf.clear();
+ }
- buf.clear();
- }
+ decode.flip();
- decode.flip();
+ rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
- rcvCnt = decode.getLong(Message.DIRECT_TYPE_SIZE);
+ if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
+ decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
- if (decode.limit() > RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE) {
- decode.position(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+ sslMeta.decodedBuffer(decode);
+ }
- sslMeta.decodedBuffer(decode);
- }
+ ByteBuffer inBuf = sslHnd.inputBuffer();
- ByteBuffer inBuf = sslHnd.inputBuffer();
+ if (inBuf.position() > 0)
+ sslMeta.encodedBuffer(inBuf);
+ }
+ else {
+ buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
- if (inBuf.position() > 0)
- sslMeta.encodedBuffer(inBuf);
- }
- else {
- buf = ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE);
+ buf.order(ByteOrder.nativeOrder());
- buf.order(ByteOrder.nativeOrder());
+ for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+ int read = ch.read(buf);
- for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
- int read = ch.read(buf);
+ if (read == -1)
+ throw new HandshakeException("Failed to read remote node recovery handshake " +
+ "(connection closed).");
- if (read == -1)
- throw new HandshakeException("Failed to read remote node recovery handshake " +
- "(connection closed).");
+ i += read;
+ }
- i += read;
- }
+ rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
+ }
- rcvCnt = buf.getLong(Message.DIRECT_TYPE_SIZE);
- }
+ if (log.isDebugEnabled())
+ log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+ if (rcvCnt == -1) {
if (log.isDebugEnabled())
- log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
-
- if (rcvCnt == -1) {
- if (log.isDebugEnabled())
- log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
- }
+ log.debug("Connection rejected, will retry client creation [rmtNode=" + rmtNodeId + ']');
}
}
catch (IOException e) {
@@ -4111,19 +4116,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
return S.toString(TcpCommunicationSpi.class, this);
}
- /** Internal exception class for proper timeout handling. */
- private static class HandshakeException extends IgniteCheckedException {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param msg Error message.
- */
- HandshakeException(String msg) {
- super(msg);
- }
- }
-
/**
* This worker takes responsibility to shut the server down when stopping,
* No other thread shall stop passed server.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/HandshakeException.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/HandshakeException.java
new file mode 100644
index 0000000..f5ffce8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/HandshakeException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Internal exception class for proper timeout handling.
+ */
+public class HandshakeException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param msg Error message.
+ */
+ public HandshakeException(String msg) {
+ super(msg);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
index f845b0b..0b76b74 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
@@ -99,6 +99,13 @@ public class HandshakeMessage implements Message {
return nodeId;
}
+ /**
+ * @return Message size in bytes.
+ */
+ public int getMessageSize() {
+ return MESSAGE_FULL_SIZE;
+ }
+
/** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java
index 2207813..026d9a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java
@@ -32,6 +32,9 @@ public class HandshakeMessage2 extends HandshakeMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** Message size in bytes including {@link HandshakeMessage} fields. */
+ public static final int HANDSHAKE2_MESSAGE_SIZE = MESSAGE_FULL_SIZE + 4;
+
/** */
private int connIdx;
@@ -65,6 +68,11 @@ public class HandshakeMessage2 extends HandshakeMessage {
}
/** {@inheritDoc} */
+ @Override public int getMessageSize() {
+ return HANDSHAKE2_MESSAGE_SIZE;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
if (!super.writeTo(buf, writer))
return false;
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
index ccacbc1..38147e1 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
@@ -616,7 +616,7 @@ class ZookeeperDiscoverySpiTestBase extends GridCommonAbstractTest {
int connIdx
) throws IgniteCheckedException {
if (failure && !matrix.hasConnection(getLocalNode(), node)) {
- processClientCreationError(node, null, new IgniteCheckedException("Test", new SocketTimeoutException()));
+ processSessionCreationError(node, null, new IgniteCheckedException("Test", new SocketTimeoutException()));
return null;
}