You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 08:04:34 UTC
[26/50] [abbrv] ignite git commit: Minor: moved inner
TcpCommunicationSpi messages to top-level classes.
Minor: moved inner TcpCommunicationSpi messages to top-level classes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c576d5a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c576d5a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c576d5a0
Branch: refs/heads/ignite-zk
Commit: c576d5a0ab1bbaab4a1b23a2c03a21ef2a7b610c
Parents: 061ec6a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 15 11:28:18 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 15 11:28:18 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 14 +-
.../spi/communication/tcp/HandshakeMessage.java | 156 ++++++++
.../communication/tcp/HandshakeMessage2.java | 95 +++++
.../spi/communication/tcp/NodeIdMessage.java | 115 ++++++
.../tcp/RecoveryLastReceivedMessage.java | 113 ++++++
.../communication/tcp/TcpCommunicationSpi.java | 391 +------------------
6 files changed, 492 insertions(+), 392 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 97e06bf..791dd91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -137,11 +137,11 @@ import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse;
import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage;
import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
-import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest;
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse;
import org.apache.ignite.internal.util.GridByteArrayList;
@@ -153,6 +153,10 @@ import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
+import org.apache.ignite.spi.communication.tcp.HandshakeMessage;
+import org.apache.ignite.spi.communication.tcp.HandshakeMessage2;
+import org.apache.ignite.spi.communication.tcp.NodeIdMessage;
+import org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.jsr166.ConcurrentHashMap8;
@@ -221,7 +225,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case -44:
- msg = new TcpCommunicationSpi.HandshakeMessage2();
+ msg = new HandshakeMessage2();
break;
@@ -291,17 +295,17 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case TcpCommunicationSpi.NODE_ID_MSG_TYPE:
- msg = new TcpCommunicationSpi.NodeIdMessage();
+ msg = new NodeIdMessage();
break;
case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE:
- msg = new TcpCommunicationSpi.RecoveryLastReceivedMessage();
+ msg = new RecoveryLastReceivedMessage();
break;
case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE:
- msg = new TcpCommunicationSpi.HandshakeMessage();
+ msg = new HandshakeMessage();
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
new file mode 100644
index 0000000..00e8e46
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Handshake message.
+ */
+public class HandshakeMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message body size in bytes. */
+ private static final int MESSAGE_SIZE = 32;
+
+ /** Full message size (with message type) in bytes. */
+ public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private long rcvCnt;
+
+ /** */
+ private long connectCnt;
+
+ /**
+ * Default constructor required by {@link Message}.
+ */
+ public HandshakeMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param connectCnt Connect count.
+ * @param rcvCnt Number of received messages.
+ */
+ public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
+ assert nodeId != null;
+ assert rcvCnt >= 0 : rcvCnt;
+
+ this.nodeId = nodeId;
+ this.connectCnt = connectCnt;
+ this.rcvCnt = rcvCnt;
+ }
+
+ /**
+ * @return Connection index.
+ */
+ public int connectionIndex() {
+ return 0;
+ }
+
+ /**
+ * @return Connect count.
+ */
+ public long connectCount() {
+ return connectCnt;
+ }
+
+ /**
+ * @return Number of received messages.
+ */
+ public long received() {
+ return rcvCnt;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ if (buf.remaining() < MESSAGE_FULL_SIZE)
+ return false;
+
+ TcpCommunicationSpi.writeMessageType(buf, directType());
+
+ byte[] bytes = U.uuidToBytes(nodeId);
+
+ assert bytes.length == 16 : bytes.length;
+
+ buf.put(bytes);
+
+ buf.putLong(rcvCnt);
+
+ buf.putLong(connectCnt);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ if (buf.remaining() < MESSAGE_SIZE)
+ return false;
+
+ byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE];
+
+ buf.get(nodeIdBytes);
+
+ nodeId = U.bytesToUuid(nodeIdBytes, 0);
+
+ rcvCnt = buf.getLong();
+
+ connectCnt = buf.getLong();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TcpCommunicationSpi.HANDSHAKE_MSG_TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HandshakeMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
new file mode 100644
index 0000000..1e8fdd9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Updated handshake message.
+ */
+public class HandshakeMessage2 extends HandshakeMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int connIdx;
+
+ /**
+ *
+ */
+ public HandshakeMessage2() {
+ // No-op.
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param connectCnt Connect count.
+ * @param rcvCnt Number of received messages.
+ * @param connIdx Connection index.
+ */
+ HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
+ super(nodeId, connectCnt, rcvCnt);
+
+ this.connIdx = connIdx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -44;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int connectionIndex() {
+ return connIdx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (buf.remaining() < 4)
+ return false;
+
+ buf.putInt(connIdx);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ if (buf.remaining() < 4)
+ return false;
+
+ connIdx = buf.getInt();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HandshakeMessage2.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
new file mode 100644
index 0000000..d05b7ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Node ID message.
+ */
+public class NodeIdMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message body size (with message type) in bytes. */
+ static final int MESSAGE_SIZE = 16;
+
+ /** Full message size (with message type) in bytes. */
+ public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+ /** */
+ byte[] nodeIdBytes;
+
+ /** */
+ byte[] nodeIdBytesWithType;
+
+ /** */
+ public NodeIdMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ NodeIdMessage(UUID nodeId) {
+ assert nodeId != null;
+
+ nodeIdBytes = U.uuidToBytes(nodeId);
+
+ assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE;
+
+ nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE];
+
+ nodeIdBytesWithType[0] = (byte)(TcpCommunicationSpi.NODE_ID_MSG_TYPE & 0xFF);
+ nodeIdBytesWithType[1] = (byte)((TcpCommunicationSpi.NODE_ID_MSG_TYPE >> 8) & 0xFF);
+
+ System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 2, nodeIdBytes.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ assert nodeIdBytes.length == MESSAGE_SIZE;
+
+ if (buf.remaining() < MESSAGE_FULL_SIZE)
+ return false;
+
+ TcpCommunicationSpi.writeMessageType(buf, directType());
+
+ buf.put(nodeIdBytes);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ if (buf.remaining() < MESSAGE_SIZE)
+ return false;
+
+ nodeIdBytes = new byte[MESSAGE_SIZE];
+
+ buf.get(nodeIdBytes);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TcpCommunicationSpi.NODE_ID_MSG_TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(NodeIdMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
new file mode 100644
index 0000000..5460084
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Recovery acknowledgment message.
+ */
+public class RecoveryLastReceivedMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ static final long ALREADY_CONNECTED = -1;
+
+ /** */
+ static final long NODE_STOPPING = -2;
+
+ /** Need wait. */
+ static final long NEED_WAIT = -3;
+
+ /** Message body size in bytes. */
+ private static final int MESSAGE_SIZE = 8;
+
+ /** Full message size (with message type) in bytes. */
+ public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+ /** */
+ private long rcvCnt;
+
+ /**
+ * Default constructor required by {@link Message}.
+ */
+ public RecoveryLastReceivedMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param rcvCnt Number of received messages.
+ */
+ public RecoveryLastReceivedMessage(long rcvCnt) {
+ this.rcvCnt = rcvCnt;
+ }
+
+ /**
+ * @return Number of received messages.
+ */
+ public long received() {
+ return rcvCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ if (buf.remaining() < MESSAGE_FULL_SIZE)
+ return false;
+
+ TcpCommunicationSpi.writeMessageType(buf, directType());
+
+ buf.putLong(rcvCnt);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ if (buf.remaining() < MESSAGE_SIZE)
+ return false;
+
+ rcvCnt = buf.getLong();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RecoveryLastReceivedMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c576d5a0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 9290f24..1f0061f 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -141,9 +141,9 @@ import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
-import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
-import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NEED_WAIT;
-import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NODE_STOPPING;
+import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
+import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NEED_WAIT;
+import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NODE_STOPPING;
/**
* <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
@@ -3923,7 +3923,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @param buf Byte buffer.
* @param type Message type.
*/
- private static void writeMessageType(ByteBuffer buf, short type) {
+ static void writeMessageType(ByteBuffer buf, short type) {
buf.put((byte)(type & 0xFF));
buf.put((byte)((type >> 8) & 0xFF));
}
@@ -4440,389 +4440,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
/**
- * Handshake message.
- */
- @SuppressWarnings("PublicInnerClass")
- public static class HandshakeMessage implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Message body size in bytes. */
- private static final int MESSAGE_SIZE = 32;
-
- /** Full message size (with message type) in bytes. */
- public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
- /** */
- private UUID nodeId;
-
- /** */
- private long rcvCnt;
-
- /** */
- private long connectCnt;
-
- /**
- * Default constructor required by {@link Message}.
- */
- public HandshakeMessage() {
- // No-op.
- }
-
- /**
- * @param nodeId Node ID.
- * @param connectCnt Connect count.
- * @param rcvCnt Number of received messages.
- */
- public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
- assert nodeId != null;
- assert rcvCnt >= 0 : rcvCnt;
-
- this.nodeId = nodeId;
- this.connectCnt = connectCnt;
- this.rcvCnt = rcvCnt;
- }
-
- /**
- * @return Connection index.
- */
- public int connectionIndex() {
- return 0;
- }
-
- /**
- * @return Connect count.
- */
- public long connectCount() {
- return connectCnt;
- }
-
- /**
- * @return Number of received messages.
- */
- public long received() {
- return rcvCnt;
- }
-
- /**
- * @return Node ID.
- */
- public UUID nodeId() {
- return nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- if (buf.remaining() < MESSAGE_FULL_SIZE)
- return false;
-
- writeMessageType(buf, directType());
-
- byte[] bytes = U.uuidToBytes(nodeId);
-
- assert bytes.length == 16 : bytes.length;
-
- buf.put(bytes);
-
- buf.putLong(rcvCnt);
-
- buf.putLong(connectCnt);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (buf.remaining() < MESSAGE_SIZE)
- return false;
-
- byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE];
-
- buf.get(nodeIdBytes);
-
- nodeId = U.bytesToUuid(nodeIdBytes, 0);
-
- rcvCnt = buf.getLong();
-
- connectCnt = buf.getLong();
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return HANDSHAKE_MSG_TYPE;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HandshakeMessage.class, this);
- }
- }
-
- /**
- * Updated handshake message.
- */
- @SuppressWarnings("PublicInnerClass")
- public static class HandshakeMessage2 extends HandshakeMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private int connIdx;
-
- /**
- *
- */
- public HandshakeMessage2() {
- // No-op.
- }
-
- /**
- * @param nodeId Node ID.
- * @param connectCnt Connect count.
- * @param rcvCnt Number of received messages.
- * @param connIdx Connection index.
- */
- HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
- super(nodeId, connectCnt, rcvCnt);
-
- this.connIdx = connIdx;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return -44;
- }
-
- /** {@inheritDoc} */
- @Override public int connectionIndex() {
- return connIdx;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- if (!super.writeTo(buf, writer))
- return false;
-
- if (buf.remaining() < 4)
- return false;
-
- buf.putInt(connIdx);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (!super.readFrom(buf, reader))
- return false;
-
- if (buf.remaining() < 4)
- return false;
-
- connIdx = buf.getInt();
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HandshakeMessage2.class, this);
- }
- }
-
- /**
- * Recovery acknowledgment message.
- */
- @SuppressWarnings("PublicInnerClass")
- public static class RecoveryLastReceivedMessage implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- static final long ALREADY_CONNECTED = -1;
-
- /** */
- static final long NODE_STOPPING = -2;
-
- /** Need wait. */
- static final long NEED_WAIT = -3;
-
- /** Message body size in bytes. */
- private static final int MESSAGE_SIZE = 8;
-
- /** Full message size (with message type) in bytes. */
- public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
- /** */
- private long rcvCnt;
-
- /**
- * Default constructor required by {@link Message}.
- */
- public RecoveryLastReceivedMessage() {
- // No-op.
- }
-
- /**
- * @param rcvCnt Number of received messages.
- */
- public RecoveryLastReceivedMessage(long rcvCnt) {
- this.rcvCnt = rcvCnt;
- }
-
- /**
- * @return Number of received messages.
- */
- public long received() {
- return rcvCnt;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- if (buf.remaining() < MESSAGE_FULL_SIZE)
- return false;
-
- writeMessageType(buf, directType());
-
- buf.putLong(rcvCnt);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (buf.remaining() < MESSAGE_SIZE)
- return false;
-
- rcvCnt = buf.getLong();
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return RECOVERY_LAST_ID_MSG_TYPE;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(RecoveryLastReceivedMessage.class, this);
- }
- }
-
- /**
- * Node ID message.
- */
- @SuppressWarnings("PublicInnerClass")
- public static class NodeIdMessage implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Message body size (with message type) in bytes. */
- private static final int MESSAGE_SIZE = 16;
-
- /** Full message size (with message type) in bytes. */
- public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
- /** */
- private byte[] nodeIdBytes;
-
- /** */
- private byte[] nodeIdBytesWithType;
-
- /** */
- public NodeIdMessage() {
- // No-op.
- }
-
- /**
- * @param nodeId Node ID.
- */
- private NodeIdMessage(UUID nodeId) {
- assert nodeId != null;
-
- nodeIdBytes = U.uuidToBytes(nodeId);
-
- assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE;
-
- nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE];
-
- nodeIdBytesWithType[0] = (byte)(NODE_ID_MSG_TYPE & 0xFF);
- nodeIdBytesWithType[1] = (byte)((NODE_ID_MSG_TYPE >> 8) & 0xFF);
-
- System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 2, nodeIdBytes.length);
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- assert nodeIdBytes.length == MESSAGE_SIZE;
-
- if (buf.remaining() < MESSAGE_FULL_SIZE)
- return false;
-
- writeMessageType(buf, directType());
-
- buf.put(nodeIdBytes);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (buf.remaining() < MESSAGE_SIZE)
- return false;
-
- nodeIdBytes = new byte[MESSAGE_SIZE];
-
- buf.get(nodeIdBytes);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return NODE_ID_MSG_TYPE;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(NodeIdMessage.class, this);
- }
- }
-
- /**
*
*/
private class ConnectGateway {