You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 08:04:36 UTC

[28/50] [abbrv] ignite git commit: Minor: moved inner TcpCommunicationSpi messages to top-level classes.

Minor: moved inner TcpCommunicationSpi messages to top-level classes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e358ae24
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e358ae24
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e358ae24

Branch: refs/heads/ignite-zk
Commit: e358ae241b334b6c03fc62ab0eee99f08eeb778a
Parents: c576d5a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 15 13:50:19 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 15 13:50:19 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   8 +-
 .../spi/communication/tcp/HandshakeMessage.java | 156 ------------------
 .../communication/tcp/HandshakeMessage2.java    |  95 -----------
 .../spi/communication/tcp/NodeIdMessage.java    | 115 --------------
 .../tcp/RecoveryLastReceivedMessage.java        | 113 -------------
 .../communication/tcp/TcpCommunicationSpi.java  |  31 ++--
 .../tcp/messages/HandshakeMessage.java          | 157 +++++++++++++++++++
 .../tcp/messages/HandshakeMessage2.java         |  95 +++++++++++
 .../tcp/messages/NodeIdMessage.java             | 128 +++++++++++++++
 .../messages/RecoveryLastReceivedMessage.java   | 114 ++++++++++++++
 .../tcp/messages/package-info.java              |  22 +++
 11 files changed, 541 insertions(+), 493 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 791dd91..78cb7a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -153,11 +153,11 @@ import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
-import org.apache.ignite.spi.communication.tcp.HandshakeMessage;
-import org.apache.ignite.spi.communication.tcp.HandshakeMessage2;
-import org.apache.ignite.spi.communication.tcp.NodeIdMessage;
-import org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
+import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
+import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
+import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
 import org.jsr166.ConcurrentHashMap8;
 
 /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
deleted file mode 100644
index 00e8e46..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.communication.tcp;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Handshake message.
- */
-public class HandshakeMessage implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Message body size in bytes. */
-    private static final int MESSAGE_SIZE = 32;
-
-    /** Full message size (with message type) in bytes. */
-    public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
-    /** */
-    private UUID nodeId;
-
-    /** */
-    private long rcvCnt;
-
-    /** */
-    private long connectCnt;
-
-    /**
-     * Default constructor required by {@link Message}.
-     */
-    public HandshakeMessage() {
-        // No-op.
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param connectCnt Connect count.
-     * @param rcvCnt Number of received messages.
-     */
-    public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
-        assert nodeId != null;
-        assert rcvCnt >= 0 : rcvCnt;
-
-        this.nodeId = nodeId;
-        this.connectCnt = connectCnt;
-        this.rcvCnt = rcvCnt;
-    }
-
-    /**
-     * @return Connection index.
-     */
-    public int connectionIndex() {
-        return 0;
-    }
-
-    /**
-     * @return Connect count.
-     */
-    public long connectCount() {
-        return connectCnt;
-    }
-
-    /**
-     * @return Number of received messages.
-     */
-    public long received() {
-        return rcvCnt;
-    }
-
-    /**
-     * @return Node ID.
-     */
-    public UUID nodeId() {
-        return nodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        if (buf.remaining() < MESSAGE_FULL_SIZE)
-            return false;
-
-        TcpCommunicationSpi.writeMessageType(buf, directType());
-
-        byte[] bytes = U.uuidToBytes(nodeId);
-
-        assert bytes.length == 16 : bytes.length;
-
-        buf.put(bytes);
-
-        buf.putLong(rcvCnt);
-
-        buf.putLong(connectCnt);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        if (buf.remaining() < MESSAGE_SIZE)
-            return false;
-
-        byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE];
-
-        buf.get(nodeIdBytes);
-
-        nodeId = U.bytesToUuid(nodeIdBytes, 0);
-
-        rcvCnt = buf.getLong();
-
-        connectCnt = buf.getLong();
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return TcpCommunicationSpi.HANDSHAKE_MSG_TYPE;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HandshakeMessage.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
deleted file mode 100644
index 1e8fdd9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.communication.tcp;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Updated handshake message.
- */
-public class HandshakeMessage2 extends HandshakeMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private int connIdx;
-
-    /**
-     *
-     */
-    public HandshakeMessage2() {
-        // No-op.
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param connectCnt Connect count.
-     * @param rcvCnt Number of received messages.
-     * @param connIdx Connection index.
-     */
-    HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
-        super(nodeId, connectCnt, rcvCnt);
-
-        this.connIdx = connIdx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return -44;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int connectionIndex() {
-        return connIdx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (buf.remaining() < 4)
-            return false;
-
-        buf.putInt(connIdx);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        if (buf.remaining() < 4)
-            return false;
-
-        connIdx = buf.getInt();
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HandshakeMessage2.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
deleted file mode 100644
index d05b7ff..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.communication.tcp;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Node ID message.
- */
-public class NodeIdMessage implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Message body size (with message type) in bytes. */
-    static final int MESSAGE_SIZE = 16;
-
-    /** Full message size (with message type) in bytes. */
-    public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
-    /** */
-    byte[] nodeIdBytes;
-
-    /** */
-    byte[] nodeIdBytesWithType;
-
-    /** */
-    public NodeIdMessage() {
-        // No-op.
-    }
-
-    /**
-     * @param nodeId Node ID.
-     */
-    NodeIdMessage(UUID nodeId) {
-        assert nodeId != null;
-
-        nodeIdBytes = U.uuidToBytes(nodeId);
-
-        assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE;
-
-        nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE];
-
-        nodeIdBytesWithType[0] = (byte)(TcpCommunicationSpi.NODE_ID_MSG_TYPE & 0xFF);
-        nodeIdBytesWithType[1] = (byte)((TcpCommunicationSpi.NODE_ID_MSG_TYPE >> 8) & 0xFF);
-
-        System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 2, nodeIdBytes.length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        assert nodeIdBytes.length == MESSAGE_SIZE;
-
-        if (buf.remaining() < MESSAGE_FULL_SIZE)
-            return false;
-
-        TcpCommunicationSpi.writeMessageType(buf, directType());
-
-        buf.put(nodeIdBytes);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        if (buf.remaining() < MESSAGE_SIZE)
-            return false;
-
-        nodeIdBytes = new byte[MESSAGE_SIZE];
-
-        buf.get(nodeIdBytes);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return TcpCommunicationSpi.NODE_ID_MSG_TYPE;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(NodeIdMessage.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
deleted file mode 100644
index 5460084..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.communication.tcp;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Recovery acknowledgment message.
- */
-public class RecoveryLastReceivedMessage implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    static final long ALREADY_CONNECTED = -1;
-
-    /** */
-    static final long NODE_STOPPING = -2;
-
-    /** Need wait. */
-    static final long NEED_WAIT = -3;
-
-    /** Message body size in bytes. */
-    private static final int MESSAGE_SIZE = 8;
-
-    /** Full message size (with message type) in bytes. */
-    public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
-    /** */
-    private long rcvCnt;
-
-    /**
-     * Default constructor required by {@link Message}.
-     */
-    public RecoveryLastReceivedMessage() {
-        // No-op.
-    }
-
-    /**
-     * @param rcvCnt Number of received messages.
-     */
-    public RecoveryLastReceivedMessage(long rcvCnt) {
-        this.rcvCnt = rcvCnt;
-    }
-
-    /**
-     * @return Number of received messages.
-     */
-    public long received() {
-        return rcvCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        if (buf.remaining() < MESSAGE_FULL_SIZE)
-            return false;
-
-        TcpCommunicationSpi.writeMessageType(buf, directType());
-
-        buf.putLong(rcvCnt);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        if (buf.remaining() < MESSAGE_SIZE)
-            return false;
-
-        rcvCnt = buf.getLong();
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(RecoveryLastReceivedMessage.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1f0061f..69da9ca 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -132,6 +132,10 @@ import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
+import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
+import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
+import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
@@ -141,9 +145,9 @@ import org.jsr166.ConcurrentLinkedDeque8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
-import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
-import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NEED_WAIT;
-import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NODE_STOPPING;
+import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
+import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT;
+import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING;
 
 /**
  * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
@@ -456,7 +460,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 ConnectionKey connKey;
 
                 if (msg instanceof NodeIdMessage) {
-                    sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
+                    sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0);
                     connKey = new ConnectionKey(sndId, 0, -1);
                 }
                 else {
@@ -3550,10 +3554,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 if (isSslEnabled()) {
                     assert sslHnd != null;
 
-                    ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
+                    ch.write(sslHnd.encrypt(ByteBuffer.wrap(NodeIdMessage.nodeIdBytesWithType(safeLocalNodeId()))));
                 }
                 else
-                    ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
+                    ch.write(ByteBuffer.wrap(NodeIdMessage.nodeIdBytesWithType(safeLocalNodeId())));
             }
 
             if (recovery != null) {
@@ -3806,20 +3810,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * @return Node ID message.
      */
     private NodeIdMessage nodeIdMessage() {
+        return new NodeIdMessage(safeLocalNodeId());
+    }
+
+    /**
+     * @return Local node ID.
+     */
+    private UUID safeLocalNodeId() {
         ClusterNode locNode = getLocalNode();
 
         UUID id;
 
         if (locNode == null) {
             U.warn(log, "Local node is not started or fully initialized [isStopping=" +
-                    getSpiContext().isStopping() + ']');
+                getSpiContext().isStopping() + ']');
 
             id = new UUID(0, 0);
         }
         else
             id = locNode.id();
 
-        return new NodeIdMessage(id);
+        return id;
     }
 
     /** {@inheritDoc} */
@@ -3923,7 +3934,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * @param buf Byte buffer.
      * @param type Message type.
      */
-    static void writeMessageType(ByteBuffer buf, short type) {
+    public static void writeMessageType(ByteBuffer buf, short type) {
         buf.put((byte)(type & 0xFF));
         buf.put((byte)((type >> 8) & 0xFF));
     }
@@ -4426,7 +4437,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
                 out.write(U.IGNITE_HEADER);
                 writeMessageType(out, NODE_ID_MSG_TYPE);
-                out.write(msg.nodeIdBytes);
+                out.write(msg.nodeIdBytes());
 
                 out.flush();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
new file mode 100644
index 0000000..e3be9c9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.messages;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ * Handshake message.
+ */
+public class HandshakeMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Message body size in bytes. */
+    private static final int MESSAGE_SIZE = 32;
+
+    /** Full message size (with message type) in bytes. */
+    public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+    /** */
+    private UUID nodeId;
+
+    /** */
+    private long rcvCnt;
+
+    /** */
+    private long connectCnt;
+
+    /**
+     * Default constructor required by {@link Message}.
+     */
+    public HandshakeMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param connectCnt Connect count.
+     * @param rcvCnt Number of received messages.
+     */
+    public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
+        assert nodeId != null;
+        assert rcvCnt >= 0 : rcvCnt;
+
+        this.nodeId = nodeId;
+        this.connectCnt = connectCnt;
+        this.rcvCnt = rcvCnt;
+    }
+
+    /**
+     * @return Connection index.
+     */
+    public int connectionIndex() {
+        return 0;
+    }
+
+    /**
+     * @return Connect count.
+     */
+    public long connectCount() {
+        return connectCnt;
+    }
+
+    /**
+     * @return Number of received messages.
+     */
+    public long received() {
+        return rcvCnt;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        if (buf.remaining() < MESSAGE_FULL_SIZE)
+            return false;
+
+        TcpCommunicationSpi.writeMessageType(buf, directType());
+
+        byte[] bytes = U.uuidToBytes(nodeId);
+
+        assert bytes.length == 16 : bytes.length;
+
+        buf.put(bytes);
+
+        buf.putLong(rcvCnt);
+
+        buf.putLong(connectCnt);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (buf.remaining() < MESSAGE_SIZE)
+            return false;
+
+        byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE];
+
+        buf.get(nodeIdBytes);
+
+        nodeId = U.bytesToUuid(nodeIdBytes, 0);
+
+        rcvCnt = buf.getLong();
+
+        connectCnt = buf.getLong();
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TcpCommunicationSpi.HANDSHAKE_MSG_TYPE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HandshakeMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java
new file mode 100644
index 0000000..f27a825
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.messages;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Updated handshake message.
+ */
+public class HandshakeMessage2 extends HandshakeMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int connIdx;
+
+    /**
+     *
+     */
+    public HandshakeMessage2() {
+        // No-op.
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param connectCnt Connect count.
+     * @param rcvCnt Number of received messages.
+     * @param connIdx Connection index.
+     */
+    public HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
+        super(nodeId, connectCnt, rcvCnt);
+
+        this.connIdx = connIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return -44;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int connectionIndex() {
+        return connIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (buf.remaining() < 4)
+            return false;
+
+        buf.putInt(connIdx);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        if (buf.remaining() < 4)
+            return false;
+
+        connIdx = buf.getInt();
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HandshakeMessage2.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
new file mode 100644
index 0000000..2c6aa30
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.messages;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ * Node ID message.
+ */
+public class NodeIdMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Message body size (with message type) in bytes. */
+    static final int MESSAGE_SIZE = 16;
+
+    /** Full message size (with message type) in bytes. */
+    public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+    /** */
+    private byte[] nodeIdBytes;
+
+    /** */
+    public NodeIdMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param nodeId Node ID.
+     */
+    public NodeIdMessage(UUID nodeId) {
+        assert nodeId != null;
+
+        nodeIdBytes = U.uuidToBytes(nodeId);
+
+        assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE;
+    }
+
+    /**
+     * @return Node ID bytes.
+     */
+    public byte[] nodeIdBytes() {
+        return nodeIdBytes;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return Marshalled node ID bytes with direct message type.
+     */
+    public static byte[] nodeIdBytesWithType(UUID nodeId) {
+        byte[] nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE];
+
+        nodeIdBytesWithType[0] = (byte)(TcpCommunicationSpi.NODE_ID_MSG_TYPE & 0xFF);
+        nodeIdBytesWithType[1] = (byte)((TcpCommunicationSpi.NODE_ID_MSG_TYPE >> 8) & 0xFF);
+
+        U.uuidToBytes(nodeId, nodeIdBytesWithType, 2);
+
+        return nodeIdBytesWithType;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        assert nodeIdBytes.length == MESSAGE_SIZE;
+
+        if (buf.remaining() < MESSAGE_FULL_SIZE)
+            return false;
+
+        TcpCommunicationSpi.writeMessageType(buf, directType());
+
+        buf.put(nodeIdBytes);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (buf.remaining() < MESSAGE_SIZE)
+            return false;
+
+        nodeIdBytes = new byte[MESSAGE_SIZE];
+
+        buf.get(nodeIdBytes);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TcpCommunicationSpi.NODE_ID_MSG_TYPE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(NodeIdMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
new file mode 100644
index 0000000..eef2655
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.messages;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ * Recovery acknowledgment message.
+ */
+public class RecoveryLastReceivedMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final long ALREADY_CONNECTED = -1;
+
+    /** */
+    public static final long NODE_STOPPING = -2;
+
+    /** Need wait. */
+    public static final long NEED_WAIT = -3;
+
+    /** Message body size in bytes. */
+    private static final int MESSAGE_SIZE = 8;
+
+    /** Full message size (with message type) in bytes. */
+    public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+    /** */
+    private long rcvCnt;
+
+    /**
+     * Default constructor required by {@link Message}.
+     */
+    public RecoveryLastReceivedMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param rcvCnt Number of received messages.
+     */
+    public RecoveryLastReceivedMessage(long rcvCnt) {
+        this.rcvCnt = rcvCnt;
+    }
+
+    /**
+     * @return Number of received messages.
+     */
+    public long received() {
+        return rcvCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        if (buf.remaining() < MESSAGE_FULL_SIZE)
+            return false;
+
+        TcpCommunicationSpi.writeMessageType(buf, directType());
+
+        buf.putLong(rcvCnt);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (buf.remaining() < MESSAGE_SIZE)
+            return false;
+
+        rcvCnt = buf.getLong();
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(RecoveryLastReceivedMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java
new file mode 100644
index 0000000..662dd26
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains implementation messages.
+ */
+package org.apache.ignite.spi.communication.tcp.messages;
\ No newline at end of file