You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 08:04:36 UTC
[28/50] [abbrv] ignite git commit: Minor: moved inner
TcpCommunicationSpi messages to top-level classes.
Minor: moved inner TcpCommunicationSpi messages to top-level classes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e358ae24
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e358ae24
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e358ae24
Branch: refs/heads/ignite-zk
Commit: e358ae241b334b6c03fc62ab0eee99f08eeb778a
Parents: c576d5a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 15 13:50:19 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 15 13:50:19 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 8 +-
.../spi/communication/tcp/HandshakeMessage.java | 156 ------------------
.../communication/tcp/HandshakeMessage2.java | 95 -----------
.../spi/communication/tcp/NodeIdMessage.java | 115 --------------
.../tcp/RecoveryLastReceivedMessage.java | 113 -------------
.../communication/tcp/TcpCommunicationSpi.java | 31 ++--
.../tcp/messages/HandshakeMessage.java | 157 +++++++++++++++++++
.../tcp/messages/HandshakeMessage2.java | 95 +++++++++++
.../tcp/messages/NodeIdMessage.java | 128 +++++++++++++++
.../messages/RecoveryLastReceivedMessage.java | 114 ++++++++++++++
.../tcp/messages/package-info.java | 22 +++
11 files changed, 541 insertions(+), 493 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 791dd91..78cb7a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -153,11 +153,11 @@ import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
-import org.apache.ignite.spi.communication.tcp.HandshakeMessage;
-import org.apache.ignite.spi.communication.tcp.HandshakeMessage2;
-import org.apache.ignite.spi.communication.tcp.NodeIdMessage;
-import org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
+import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
+import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
+import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
import org.jsr166.ConcurrentHashMap8;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
deleted file mode 100644
index 00e8e46..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.communication.tcp;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Handshake message.
- */
-public class HandshakeMessage implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Message body size in bytes. */
- private static final int MESSAGE_SIZE = 32;
-
- /** Full message size (with message type) in bytes. */
- public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
- /** */
- private UUID nodeId;
-
- /** */
- private long rcvCnt;
-
- /** */
- private long connectCnt;
-
- /**
- * Default constructor required by {@link Message}.
- */
- public HandshakeMessage() {
- // No-op.
- }
-
- /**
- * @param nodeId Node ID.
- * @param connectCnt Connect count.
- * @param rcvCnt Number of received messages.
- */
- public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
- assert nodeId != null;
- assert rcvCnt >= 0 : rcvCnt;
-
- this.nodeId = nodeId;
- this.connectCnt = connectCnt;
- this.rcvCnt = rcvCnt;
- }
-
- /**
- * @return Connection index.
- */
- public int connectionIndex() {
- return 0;
- }
-
- /**
- * @return Connect count.
- */
- public long connectCount() {
- return connectCnt;
- }
-
- /**
- * @return Number of received messages.
- */
- public long received() {
- return rcvCnt;
- }
-
- /**
- * @return Node ID.
- */
- public UUID nodeId() {
- return nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- if (buf.remaining() < MESSAGE_FULL_SIZE)
- return false;
-
- TcpCommunicationSpi.writeMessageType(buf, directType());
-
- byte[] bytes = U.uuidToBytes(nodeId);
-
- assert bytes.length == 16 : bytes.length;
-
- buf.put(bytes);
-
- buf.putLong(rcvCnt);
-
- buf.putLong(connectCnt);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (buf.remaining() < MESSAGE_SIZE)
- return false;
-
- byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE];
-
- buf.get(nodeIdBytes);
-
- nodeId = U.bytesToUuid(nodeIdBytes, 0);
-
- rcvCnt = buf.getLong();
-
- connectCnt = buf.getLong();
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return TcpCommunicationSpi.HANDSHAKE_MSG_TYPE;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HandshakeMessage.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
deleted file mode 100644
index 1e8fdd9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.communication.tcp;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Updated handshake message.
- */
-public class HandshakeMessage2 extends HandshakeMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private int connIdx;
-
- /**
- *
- */
- public HandshakeMessage2() {
- // No-op.
- }
-
- /**
- * @param nodeId Node ID.
- * @param connectCnt Connect count.
- * @param rcvCnt Number of received messages.
- * @param connIdx Connection index.
- */
- HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
- super(nodeId, connectCnt, rcvCnt);
-
- this.connIdx = connIdx;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return -44;
- }
-
- /** {@inheritDoc} */
- @Override public int connectionIndex() {
- return connIdx;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- if (!super.writeTo(buf, writer))
- return false;
-
- if (buf.remaining() < 4)
- return false;
-
- buf.putInt(connIdx);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (!super.readFrom(buf, reader))
- return false;
-
- if (buf.remaining() < 4)
- return false;
-
- connIdx = buf.getInt();
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HandshakeMessage2.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
deleted file mode 100644
index d05b7ff..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.communication.tcp;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Node ID message.
- */
-public class NodeIdMessage implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Message body size (with message type) in bytes. */
- static final int MESSAGE_SIZE = 16;
-
- /** Full message size (with message type) in bytes. */
- public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
- /** */
- byte[] nodeIdBytes;
-
- /** */
- byte[] nodeIdBytesWithType;
-
- /** */
- public NodeIdMessage() {
- // No-op.
- }
-
- /**
- * @param nodeId Node ID.
- */
- NodeIdMessage(UUID nodeId) {
- assert nodeId != null;
-
- nodeIdBytes = U.uuidToBytes(nodeId);
-
- assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE;
-
- nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE];
-
- nodeIdBytesWithType[0] = (byte)(TcpCommunicationSpi.NODE_ID_MSG_TYPE & 0xFF);
- nodeIdBytesWithType[1] = (byte)((TcpCommunicationSpi.NODE_ID_MSG_TYPE >> 8) & 0xFF);
-
- System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 2, nodeIdBytes.length);
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- assert nodeIdBytes.length == MESSAGE_SIZE;
-
- if (buf.remaining() < MESSAGE_FULL_SIZE)
- return false;
-
- TcpCommunicationSpi.writeMessageType(buf, directType());
-
- buf.put(nodeIdBytes);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (buf.remaining() < MESSAGE_SIZE)
- return false;
-
- nodeIdBytes = new byte[MESSAGE_SIZE];
-
- buf.get(nodeIdBytes);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return TcpCommunicationSpi.NODE_ID_MSG_TYPE;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(NodeIdMessage.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
deleted file mode 100644
index 5460084..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.communication.tcp;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Recovery acknowledgment message.
- */
-public class RecoveryLastReceivedMessage implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- static final long ALREADY_CONNECTED = -1;
-
- /** */
- static final long NODE_STOPPING = -2;
-
- /** Need wait. */
- static final long NEED_WAIT = -3;
-
- /** Message body size in bytes. */
- private static final int MESSAGE_SIZE = 8;
-
- /** Full message size (with message type) in bytes. */
- public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
-
- /** */
- private long rcvCnt;
-
- /**
- * Default constructor required by {@link Message}.
- */
- public RecoveryLastReceivedMessage() {
- // No-op.
- }
-
- /**
- * @param rcvCnt Number of received messages.
- */
- public RecoveryLastReceivedMessage(long rcvCnt) {
- this.rcvCnt = rcvCnt;
- }
-
- /**
- * @return Number of received messages.
- */
- public long received() {
- return rcvCnt;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- if (buf.remaining() < MESSAGE_FULL_SIZE)
- return false;
-
- TcpCommunicationSpi.writeMessageType(buf, directType());
-
- buf.putLong(rcvCnt);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- if (buf.remaining() < MESSAGE_SIZE)
- return false;
-
- rcvCnt = buf.getLong();
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(RecoveryLastReceivedMessage.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1f0061f..69da9ca 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -132,6 +132,10 @@ import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
+import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
+import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
+import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
@@ -141,9 +145,9 @@ import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
-import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
-import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NEED_WAIT;
-import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NODE_STOPPING;
+import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
+import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT;
+import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING;
/**
* <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
@@ -456,7 +460,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
ConnectionKey connKey;
if (msg instanceof NodeIdMessage) {
- sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0);
+ sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0);
connKey = new ConnectionKey(sndId, 0, -1);
}
else {
@@ -3550,10 +3554,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (isSslEnabled()) {
assert sslHnd != null;
- ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)));
+ ch.write(sslHnd.encrypt(ByteBuffer.wrap(NodeIdMessage.nodeIdBytesWithType(safeLocalNodeId()))));
}
else
- ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
+ ch.write(ByteBuffer.wrap(NodeIdMessage.nodeIdBytesWithType(safeLocalNodeId())));
}
if (recovery != null) {
@@ -3806,20 +3810,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @return Node ID message.
*/
private NodeIdMessage nodeIdMessage() {
+ return new NodeIdMessage(safeLocalNodeId());
+ }
+
+ /**
+ * @return Local node ID.
+ */
+ private UUID safeLocalNodeId() {
ClusterNode locNode = getLocalNode();
UUID id;
if (locNode == null) {
U.warn(log, "Local node is not started or fully initialized [isStopping=" +
- getSpiContext().isStopping() + ']');
+ getSpiContext().isStopping() + ']');
id = new UUID(0, 0);
}
else
id = locNode.id();
- return new NodeIdMessage(id);
+ return id;
}
/** {@inheritDoc} */
@@ -3923,7 +3934,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
* @param buf Byte buffer.
* @param type Message type.
*/
- static void writeMessageType(ByteBuffer buf, short type) {
+ public static void writeMessageType(ByteBuffer buf, short type) {
buf.put((byte)(type & 0xFF));
buf.put((byte)((type >> 8) & 0xFF));
}
@@ -4426,7 +4437,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
out.write(U.IGNITE_HEADER);
writeMessageType(out, NODE_ID_MSG_TYPE);
- out.write(msg.nodeIdBytes);
+ out.write(msg.nodeIdBytes());
out.flush();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
new file mode 100644
index 0000000..e3be9c9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.messages;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ * Handshake message.
+ */
+public class HandshakeMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message body size in bytes. */
+ private static final int MESSAGE_SIZE = 32;
+
+ /** Full message size (with message type) in bytes. */
+ public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private long rcvCnt;
+
+ /** */
+ private long connectCnt;
+
+ /**
+ * Default constructor required by {@link Message}.
+ */
+ public HandshakeMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param connectCnt Connect count.
+ * @param rcvCnt Number of received messages.
+ */
+ public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
+ assert nodeId != null;
+ assert rcvCnt >= 0 : rcvCnt;
+
+ this.nodeId = nodeId;
+ this.connectCnt = connectCnt;
+ this.rcvCnt = rcvCnt;
+ }
+
+ /**
+ * @return Connection index.
+ */
+ public int connectionIndex() {
+ return 0;
+ }
+
+ /**
+ * @return Connect count.
+ */
+ public long connectCount() {
+ return connectCnt;
+ }
+
+ /**
+ * @return Number of received messages.
+ */
+ public long received() {
+ return rcvCnt;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ if (buf.remaining() < MESSAGE_FULL_SIZE)
+ return false;
+
+ TcpCommunicationSpi.writeMessageType(buf, directType());
+
+ byte[] bytes = U.uuidToBytes(nodeId);
+
+ assert bytes.length == 16 : bytes.length;
+
+ buf.put(bytes);
+
+ buf.putLong(rcvCnt);
+
+ buf.putLong(connectCnt);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ if (buf.remaining() < MESSAGE_SIZE)
+ return false;
+
+ byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE];
+
+ buf.get(nodeIdBytes);
+
+ nodeId = U.bytesToUuid(nodeIdBytes, 0);
+
+ rcvCnt = buf.getLong();
+
+ connectCnt = buf.getLong();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TcpCommunicationSpi.HANDSHAKE_MSG_TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HandshakeMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java
new file mode 100644
index 0000000..f27a825
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.messages;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Updated handshake message.
+ */
+public class HandshakeMessage2 extends HandshakeMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int connIdx;
+
+ /**
+ *
+ */
+ public HandshakeMessage2() {
+ // No-op.
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param connectCnt Connect count.
+ * @param rcvCnt Number of received messages.
+ * @param connIdx Connection index.
+ */
+ public HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) {
+ super(nodeId, connectCnt, rcvCnt);
+
+ this.connIdx = connIdx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -44;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int connectionIndex() {
+ return connIdx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (buf.remaining() < 4)
+ return false;
+
+ buf.putInt(connIdx);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ if (buf.remaining() < 4)
+ return false;
+
+ connIdx = buf.getInt();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HandshakeMessage2.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
new file mode 100644
index 0000000..2c6aa30
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.messages;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ * Node ID message.
+ */
+public class NodeIdMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message body size (with message type) in bytes. */
+ static final int MESSAGE_SIZE = 16;
+
+ /** Full message size (with message type) in bytes. */
+ public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+ /** */
+ private byte[] nodeIdBytes;
+
+ /** */
+ public NodeIdMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ public NodeIdMessage(UUID nodeId) {
+ assert nodeId != null;
+
+ nodeIdBytes = U.uuidToBytes(nodeId);
+
+ assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE;
+ }
+
+ /**
+ * @return Node ID bytes.
+ */
+ public byte[] nodeIdBytes() {
+ return nodeIdBytes;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return Marshalled node ID bytes with direct message type.
+ */
+ public static byte[] nodeIdBytesWithType(UUID nodeId) {
+ byte[] nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE];
+
+ nodeIdBytesWithType[0] = (byte)(TcpCommunicationSpi.NODE_ID_MSG_TYPE & 0xFF);
+ nodeIdBytesWithType[1] = (byte)((TcpCommunicationSpi.NODE_ID_MSG_TYPE >> 8) & 0xFF);
+
+ U.uuidToBytes(nodeId, nodeIdBytesWithType, 2);
+
+ return nodeIdBytesWithType;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ assert nodeIdBytes.length == MESSAGE_SIZE;
+
+ if (buf.remaining() < MESSAGE_FULL_SIZE)
+ return false;
+
+ TcpCommunicationSpi.writeMessageType(buf, directType());
+
+ buf.put(nodeIdBytes);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ if (buf.remaining() < MESSAGE_SIZE)
+ return false;
+
+ nodeIdBytes = new byte[MESSAGE_SIZE];
+
+ buf.get(nodeIdBytes);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TcpCommunicationSpi.NODE_ID_MSG_TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(NodeIdMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
new file mode 100644
index 0000000..eef2655
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.messages;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ * Recovery acknowledgment message.
+ */
+public class RecoveryLastReceivedMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final long ALREADY_CONNECTED = -1;
+
+ /** */
+ public static final long NODE_STOPPING = -2;
+
+ /** Need wait. */
+ public static final long NEED_WAIT = -3;
+
+ /** Message body size in bytes. */
+ private static final int MESSAGE_SIZE = 8;
+
+ /** Full message size (with message type) in bytes. */
+ public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE;
+
+ /** */
+ private long rcvCnt;
+
+ /**
+ * Default constructor required by {@link Message}.
+ */
+ public RecoveryLastReceivedMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param rcvCnt Number of received messages.
+ */
+ public RecoveryLastReceivedMessage(long rcvCnt) {
+ this.rcvCnt = rcvCnt;
+ }
+
+ /**
+ * @return Number of received messages.
+ */
+ public long received() {
+ return rcvCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ if (buf.remaining() < MESSAGE_FULL_SIZE)
+ return false;
+
+ TcpCommunicationSpi.writeMessageType(buf, directType());
+
+ buf.putLong(rcvCnt);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ if (buf.remaining() < MESSAGE_SIZE)
+ return false;
+
+ rcvCnt = buf.getLong();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RecoveryLastReceivedMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java
new file mode 100644
index 0000000..662dd26
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains implementation messages.
+ */
+package org.apache.ignite.spi.communication.tcp.messages;
\ No newline at end of file