You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 08:04:34 UTC

[26/50] [abbrv] ignite git commit: Minor: moved inner TcpCommunicationSpi messages to top-level classes.

Minor: moved inner TcpCommunicationSpi messages to top-level classes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c576d5a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c576d5a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c576d5a0

Branch: refs/heads/ignite-zk
Commit: c576d5a0ab1bbaab4a1b23a2c03a21ef2a7b610c
Parents: 061ec6a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 15 11:28:18 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 15 11:28:18 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  14 +-
 .../spi/communication/tcp/HandshakeMessage.java | 156 ++++++++
 .../communication/tcp/HandshakeMessage2.java    |  95 +++++
 .../spi/communication/tcp/NodeIdMessage.java    | 115 ++++++
 .../tcp/RecoveryLastReceivedMessage.java        | 113 ++++++
 .../communication/tcp/TcpCommunicationSpi.java  | 391 +------------------
 6 files changed, 492 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 97e06bf..791dd91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -137,11 +137,11 @@ import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse;
 import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage;
 import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
 import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
-import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
 import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest;
 import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse;
 import org.apache.ignite.internal.util.GridByteArrayList;
@@ -153,6 +153,10 @@ import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
+import org.apache.ignite.spi.communication.tcp.HandshakeMessage;
+import org.apache.ignite.spi.communication.tcp.HandshakeMessage2;
+import org.apache.ignite.spi.communication.tcp.NodeIdMessage;
+import org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -221,7 +225,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case -44:
-                msg = new TcpCommunicationSpi.HandshakeMessage2();
+                msg = new HandshakeMessage2();
 
                 break;
 
@@ -291,17 +295,17 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case TcpCommunicationSpi.NODE_ID_MSG_TYPE:
-                msg = new TcpCommunicationSpi.NodeIdMessage();
+                msg = new NodeIdMessage();
 
                 break;
 
             case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE:
-                msg = new TcpCommunicationSpi.RecoveryLastReceivedMessage();
+                msg = new RecoveryLastReceivedMessage();
 
                 break;
 
             case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE:
-                msg = new TcpCommunicationSpi.HandshakeMessage();
+                msg = new HandshakeMessage();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
new file mode 100644
index 0000000..00e8e46
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Handshake message.
+ */
+public class HandshakeMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Message body size in bytes. */
+    private static final int MESSAGE_SIZE = 32;
+
+    /** Full message size (with message type) in bytes. */
+    public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+    /** */
+    private UUID nodeId;
+
+    /** */
+    private long rcvCnt;
+
+    /** */
+    private long connectCnt;
+
+    /**
+     * Default constructor required by {@link Message}.
+     */
+    public HandshakeMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param connectCnt Connect count.
+     * @param rcvCnt Number of received messages.
+     */
+    public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
+        assert nodeId != null;
+        assert rcvCnt >= 0 : rcvCnt;
+
+        this.nodeId = nodeId;
+        this.connectCnt = connectCnt;
+        this.rcvCnt = rcvCnt;
+    }
+
+    /**
+     * @return Connection index.
+     */
+    public int connectionIndex() {
+        return 0;
+    }
+
+    /**
+     * @return Connect count.
+     */
+    public long connectCount() {
+        return connectCnt;
+    }
+
+    /**
+     * @return Number of received messages.
+     */
+    public long received() {
+        return rcvCnt;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        if (buf.remaining() < MESSAGE_FULL_SIZE)
+            return false;
+
+        TcpCommunicationSpi.writeMessageType(buf, directType());
+
+        byte[] bytes = U.uuidToBytes(nodeId);
+
+        assert bytes.length == 16 : bytes.length;
+
+        buf.put(bytes);
+
+        buf.putLong(rcvCnt);
+
+        buf.putLong(connectCnt);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (buf.remaining() < MESSAGE_SIZE)
+            return false;
+
+        byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE];
+
+        buf.get(nodeIdBytes);
+
+        nodeId = U.bytesToUuid(nodeIdBytes, 0);
+
+        rcvCnt = buf.getLong();
+
+        connectCnt = buf.getLong();
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TcpCommunicationSpi.HANDSHAKE_MSG_TYPE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HandshakeMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
new file mode 100644
index 0000000..1e8fdd9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Updated handshake message.
+ */
+public class HandshakeMessage2 extends HandshakeMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int connIdx;
+
+    /**
+     *
+     */
+    public HandshakeMessage2() {
+        // No-op.
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param connectCnt Connect count.
+     * @param rcvCnt Number of received messages.
+     * @param connIdx Connection index.
+     */
+    HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
+        super(nodeId, connectCnt, rcvCnt);
+
+        this.connIdx = connIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return -44;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int connectionIndex() {
+        return connIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (buf.remaining() < 4)
+            return false;
+
+        buf.putInt(connIdx);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        if (buf.remaining() < 4)
+            return false;
+
+        connIdx = buf.getInt();
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HandshakeMessage2.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
new file mode 100644
index 0000000..d05b7ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Node ID message.
+ */
+public class NodeIdMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Message body size (with message type) in bytes. */
+    static final int MESSAGE_SIZE = 16;
+
+    /** Full message size (with message type) in bytes. */
+    public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+    /** */
+    byte[] nodeIdBytes;
+
+    /** */
+    byte[] nodeIdBytesWithType;
+
+    /** */
+    public NodeIdMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param nodeId Node ID.
+     */
+    NodeIdMessage(UUID nodeId) {
+        assert nodeId != null;
+
+        nodeIdBytes = U.uuidToBytes(nodeId);
+
+        assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE;
+
+        nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE];
+
+        nodeIdBytesWithType[0] = (byte)(TcpCommunicationSpi.NODE_ID_MSG_TYPE & 0xFF);
+        nodeIdBytesWithType[1] = (byte)((TcpCommunicationSpi.NODE_ID_MSG_TYPE >> 8) & 0xFF);
+
+        System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 2, nodeIdBytes.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        assert nodeIdBytes.length == MESSAGE_SIZE;
+
+        if (buf.remaining() < MESSAGE_FULL_SIZE)
+            return false;
+
+        TcpCommunicationSpi.writeMessageType(buf, directType());
+
+        buf.put(nodeIdBytes);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (buf.remaining() < MESSAGE_SIZE)
+            return false;
+
+        nodeIdBytes = new byte[MESSAGE_SIZE];
+
+        buf.get(nodeIdBytes);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TcpCommunicationSpi.NODE_ID_MSG_TYPE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(NodeIdMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
new file mode 100644
index 0000000..5460084
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Recovery acknowledgment message.
+ */
+public class RecoveryLastReceivedMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    static final long ALREADY_CONNECTED = -1;
+
+    /** */
+    static final long NODE_STOPPING = -2;
+
+    /** Need wait. */
+    static final long NEED_WAIT = -3;
+
+    /** Message body size in bytes. */
+    private static final int MESSAGE_SIZE = 8;
+
+    /** Full message size (with message type) in bytes. */
+    public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+    /** */
+    private long rcvCnt;
+
+    /**
+     * Default constructor required by {@link Message}.
+     */
+    public RecoveryLastReceivedMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param rcvCnt Number of received messages.
+     */
+    public RecoveryLastReceivedMessage(long rcvCnt) {
+        this.rcvCnt = rcvCnt;
+    }
+
+    /**
+     * @return Number of received messages.
+     */
+    public long received() {
+        return rcvCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        if (buf.remaining() < MESSAGE_FULL_SIZE)
+            return false;
+
+        TcpCommunicationSpi.writeMessageType(buf, directType());
+
+        buf.putLong(rcvCnt);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (buf.remaining() < MESSAGE_SIZE)
+            return false;
+
+        rcvCnt = buf.getLong();
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(RecoveryLastReceivedMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 9290f24..1f0061f 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -141,9 +141,9 @@ import org.jsr166.ConcurrentLinkedDeque8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
-import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
-import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NEED_WAIT;
-import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NODE_STOPPING;
+import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
+import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NEED_WAIT;
+import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NODE_STOPPING;
 
 /**
  * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
@@ -3923,7 +3923,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      * @param buf Byte buffer.
      * @param type Message type.
      */
-    private static void writeMessageType(ByteBuffer buf, short type) {
+    static void writeMessageType(ByteBuffer buf, short type) {
         buf.put((byte)(type & 0xFF));
         buf.put((byte)((type >> 8) & 0xFF));
     }
@@ -4440,389 +4440,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /**
-     * Handshake message.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public static class HandshakeMessage implements Message {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Message body size in bytes. */
-        private static final int MESSAGE_SIZE = 32;
-
-        /** Full message size (with message type) in bytes. */
-        public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
-        /** */
-        private UUID nodeId;
-
-        /** */
-        private long rcvCnt;
-
-        /** */
-        private long connectCnt;
-
-        /**
-         * Default constructor required by {@link Message}.
-         */
-        public HandshakeMessage() {
-            // No-op.
-        }
-
-        /**
-         * @param nodeId Node ID.
-         * @param connectCnt Connect count.
-         * @param rcvCnt Number of received messages.
-         */
-        public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
-            assert nodeId != null;
-            assert rcvCnt >= 0 : rcvCnt;
-
-            this.nodeId = nodeId;
-            this.connectCnt = connectCnt;
-            this.rcvCnt = rcvCnt;
-        }
-
-        /**
-         * @return Connection index.
-         */
-        public int connectionIndex() {
-            return 0;
-        }
-
-        /**
-         * @return Connect count.
-         */
-        public long connectCount() {
-            return connectCnt;
-        }
-
-        /**
-         * @return Number of received messages.
-         */
-        public long received() {
-            return rcvCnt;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        public UUID nodeId() {
-            return nodeId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onAckReceived() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-            if (buf.remaining() < MESSAGE_FULL_SIZE)
-                return false;
-
-            writeMessageType(buf, directType());
-
-            byte[] bytes = U.uuidToBytes(nodeId);
-
-            assert bytes.length == 16 : bytes.length;
-
-            buf.put(bytes);
-
-            buf.putLong(rcvCnt);
-
-            buf.putLong(connectCnt);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-            if (buf.remaining() < MESSAGE_SIZE)
-                return false;
-
-            byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE];
-
-            buf.get(nodeIdBytes);
-
-            nodeId = U.bytesToUuid(nodeIdBytes, 0);
-
-            rcvCnt = buf.getLong();
-
-            connectCnt = buf.getLong();
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            return HANDSHAKE_MSG_TYPE;
-        }
-
-        /** {@inheritDoc} */
-        @Override public byte fieldsCount() {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(HandshakeMessage.class, this);
-        }
-    }
-
-    /**
-     * Updated handshake message.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public static class HandshakeMessage2 extends HandshakeMessage {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private int connIdx;
-
-        /**
-         *
-         */
-        public HandshakeMessage2() {
-            // No-op.
-        }
-
-        /**
-         * @param nodeId Node ID.
-         * @param connectCnt Connect count.
-         * @param rcvCnt Number of received messages.
-         * @param connIdx Connection index.
-         */
-        HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
-            super(nodeId, connectCnt, rcvCnt);
-
-            this.connIdx = connIdx;
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            return -44;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int connectionIndex() {
-            return connIdx;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-            if (!super.writeTo(buf, writer))
-                return false;
-
-            if (buf.remaining() < 4)
-                return false;
-
-            buf.putInt(connIdx);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-            if (!super.readFrom(buf, reader))
-                return false;
-
-            if (buf.remaining() < 4)
-                return false;
-
-            connIdx = buf.getInt();
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(HandshakeMessage2.class, this);
-        }
-    }
-
-    /**
-     * Recovery acknowledgment message.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public static class RecoveryLastReceivedMessage implements Message {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        static final long ALREADY_CONNECTED = -1;
-
-        /** */
-        static final long NODE_STOPPING = -2;
-
-        /** Need wait. */
-        static final long NEED_WAIT = -3;
-
-        /** Message body size in bytes. */
-        private static final int MESSAGE_SIZE = 8;
-
-        /** Full message size (with message type) in bytes. */
-        public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
-        /** */
-        private long rcvCnt;
-
-        /**
-         * Default constructor required by {@link Message}.
-         */
-        public RecoveryLastReceivedMessage() {
-            // No-op.
-        }
-
-        /**
-         * @param rcvCnt Number of received messages.
-         */
-        public RecoveryLastReceivedMessage(long rcvCnt) {
-            this.rcvCnt = rcvCnt;
-        }
-
-        /**
-         * @return Number of received messages.
-         */
-        public long received() {
-            return rcvCnt;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onAckReceived() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-            if (buf.remaining() < MESSAGE_FULL_SIZE)
-                return false;
-
-            writeMessageType(buf, directType());
-
-            buf.putLong(rcvCnt);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-            if (buf.remaining() < MESSAGE_SIZE)
-                return false;
-
-            rcvCnt = buf.getLong();
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            return RECOVERY_LAST_ID_MSG_TYPE;
-        }
-
-        /** {@inheritDoc} */
-        @Override public byte fieldsCount() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(RecoveryLastReceivedMessage.class, this);
-        }
-    }
-
-    /**
-     * Node ID message.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public static class NodeIdMessage implements Message {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Message body size (with message type) in bytes. */
-        private static final int MESSAGE_SIZE = 16;
-
-        /** Full message size (with message type) in bytes. */
-        public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
-        /** */
-        private byte[] nodeIdBytes;
-
-        /** */
-        private byte[] nodeIdBytesWithType;
-
-        /** */
-        public NodeIdMessage() {
-            // No-op.
-        }
-
-        /**
-         * @param nodeId Node ID.
-         */
-        private NodeIdMessage(UUID nodeId) {
-            assert nodeId != null;
-
-            nodeIdBytes = U.uuidToBytes(nodeId);
-
-            assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE;
-
-            nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE];
-
-            nodeIdBytesWithType[0] = (byte)(NODE_ID_MSG_TYPE & 0xFF);
-            nodeIdBytesWithType[1] = (byte)((NODE_ID_MSG_TYPE >> 8) & 0xFF);
-
-            System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 2, nodeIdBytes.length);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onAckReceived() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-            assert nodeIdBytes.length == MESSAGE_SIZE;
-
-            if (buf.remaining() < MESSAGE_FULL_SIZE)
-                return false;
-
-            writeMessageType(buf, directType());
-
-            buf.put(nodeIdBytes);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-            if (buf.remaining() < MESSAGE_SIZE)
-                return false;
-
-            nodeIdBytes = new byte[MESSAGE_SIZE];
-
-            buf.get(nodeIdBytes);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            return NODE_ID_MSG_TYPE;
-        }
-
-        /** {@inheritDoc} */
-        @Override public byte fieldsCount() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(NodeIdMessage.class, this);
-        }
-    }
-
-    /**
      *
      */
     private class ConnectGateway {