You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 15:07:17 UTC
[16/52] [abbrv] incubator-ignite git commit: # Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
new file mode 100644
index 0000000..4632eaf
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
@@ -0,0 +1,308 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.spi.discovery.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.gridgain.grid.spi.discovery.DiscoveryMetricsHelper.*;
+
+/**
+ * Heartbeat message.
+ * <p>
+ * It is sent by coordinator node across the ring once a configured period.
+ * Message makes two passes:
+ * <ol>
+ * <li>During first pass, all nodes add their metrics to the message and
+ * update local metrics with metrics currently present in the message.</li>
+ * <li>During second pass, all nodes update all metrics present in the message
+ * and remove their own metrics from the message.</li>
+ * </ol>
+ * When message reaches coordinator second time it is discarded (it finishes the
+ * second pass).
+ */
+@TcpDiscoveryRedirectToClient
+public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Map to store nodes metrics. */
+ @GridToStringExclude
+ private Map<UUID, MetricsSet> metrics;
+
+ /** Client node IDs. */
+ private Collection<UUID> clientNodeIds;
+
+ /**
+ * Public default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryHeartbeatMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param creatorNodeId Creator node.
+ */
+ public TcpDiscoveryHeartbeatMessage(UUID creatorNodeId) {
+ super(creatorNodeId);
+
+ metrics = U.newHashMap(1);
+ clientNodeIds = new HashSet<>();
+ }
+
+ /**
+ * Sets metrics for particular node.
+ *
+ * @param nodeId Node ID.
+ * @param metrics Node metrics.
+ */
+ public void setMetrics(UUID nodeId, ClusterNodeMetrics metrics) {
+ assert nodeId != null;
+ assert metrics != null;
+ assert !this.metrics.containsKey(nodeId);
+
+ this.metrics.put(nodeId, new MetricsSet(metrics));
+ }
+
+ /**
+ * Sets metrics for a client node.
+ *
+ * @param nodeId Server node ID.
+ * @param clientNodeId Client node ID.
+ * @param metrics Node metrics.
+ */
+ public void setClientMetrics(UUID nodeId, UUID clientNodeId, ClusterNodeMetrics metrics) {
+ assert nodeId != null;
+ assert clientNodeId != null;
+ assert metrics != null;
+ assert this.metrics.containsKey(nodeId);
+
+ this.metrics.get(nodeId).addClientMetrics(clientNodeId, metrics);
+ }
+
+ /**
+ * Removes metrics for particular node from the message.
+ *
+ * @param nodeId Node ID.
+ */
+ public void removeMetrics(UUID nodeId) {
+ assert nodeId != null;
+
+ metrics.remove(nodeId);
+ }
+
+ /**
+ * Gets metrics map.
+ *
+ * @return Metrics map.
+ */
+ public Map<UUID, MetricsSet> metrics() {
+ return metrics;
+ }
+
+ /**
+ * @return {@code True} if this message contains metrics.
+ */
+ public boolean hasMetrics() {
+ return !metrics.isEmpty();
+ }
+
+ /**
+ * @return {@code True} if this message contains metrics.
+ */
+ public boolean hasMetrics(UUID nodeId) {
+ assert nodeId != null;
+
+ return metrics.get(nodeId) != null;
+ }
+
+ /**
+ * Gets client node IDs for particular node.
+ *
+ * @return Client node IDs.
+ */
+ public Collection<UUID> clientNodeIds() {
+ return clientNodeIds;
+ }
+
+ /**
+ * Adds client node ID.
+ *
+ * @param clientNodeId Client node ID.
+ */
+ public void addClientNodeId(UUID clientNodeId) {
+ clientNodeIds.add(clientNodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ out.writeInt(metrics.size());
+
+ if (!metrics.isEmpty()) {
+ for (Map.Entry<UUID, MetricsSet> e : metrics.entrySet()) {
+ U.writeUuid(out, e.getKey());
+ out.writeObject(e.getValue());
+ }
+ }
+
+ U.writeCollection(out, clientNodeIds);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ int metricsSize = in.readInt();
+
+ metrics = U.newHashMap(metricsSize);
+
+ for (int i = 0; i < metricsSize; i++)
+ metrics.put(U.readUuid(in), (MetricsSet)in.readObject());
+
+ clientNodeIds = U.readCollection(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryHeartbeatMessage.class, this, "super", super.toString());
+ }
+
+ /**
+ * @param metrics Metrics.
+ * @return Serialized metrics.
+ */
+ private static byte[] serializeMetrics(ClusterNodeMetrics metrics) {
+ assert metrics != null;
+
+ byte[] buf = new byte[DiscoveryMetricsHelper.METRICS_SIZE];
+
+ serialize(buf, 0, metrics);
+
+ return buf;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param metrics Metrics.
+ * @return Serialized metrics.
+ */
+ private static byte[] serializeMetrics(UUID nodeId, ClusterNodeMetrics metrics) {
+ assert nodeId != null;
+ assert metrics != null;
+
+ byte[] buf = new byte[16 + DiscoveryMetricsHelper.METRICS_SIZE];
+
+ U.longToBytes(nodeId.getMostSignificantBits(), buf, 0);
+ U.longToBytes(nodeId.getLeastSignificantBits(), buf, 8);
+
+ serialize(buf, 16, metrics);
+
+ return buf;
+ }
+
+ /**
+ */
+ @SuppressWarnings("PublicInnerClass")
+ public static class MetricsSet implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Metrics. */
+ private byte[] metrics;
+
+ /** Client metrics. */
+ private Collection<byte[]> clientMetrics;
+
+ /**
+ */
+ public MetricsSet() {
+ // No-op.
+ }
+
+ /**
+ * @param metrics Metrics.
+ */
+ public MetricsSet(ClusterNodeMetrics metrics) {
+ assert metrics != null;
+
+ this.metrics = serializeMetrics(metrics);
+ }
+
+ /**
+ * @return Deserialized metrics.
+ */
+ public ClusterNodeMetrics metrics() {
+ return deserialize(metrics, 0);
+ }
+
+ /**
+ * @return Client metrics.
+ */
+ public Collection<T2<UUID, ClusterNodeMetrics>> clientMetrics() {
+ return F.viewReadOnly(clientMetrics, new C1<byte[], T2<UUID, ClusterNodeMetrics>>() {
+ @Override public T2<UUID, ClusterNodeMetrics> apply(byte[] bytes) {
+ UUID nodeId = new UUID(U.bytesToLong(bytes, 0), U.bytesToLong(bytes, 8));
+
+ return new T2<>(nodeId, deserialize(bytes, 16));
+ }
+ });
+ }
+
+ /**
+ * @param nodeId Client node ID.
+ * @param metrics Client metrics.
+ */
+ private void addClientMetrics(UUID nodeId, ClusterNodeMetrics metrics) {
+ assert nodeId != null;
+ assert metrics != null;
+
+ if (clientMetrics == null)
+ clientMetrics = new ArrayList<>();
+
+ clientMetrics.add(serializeMetrics(nodeId, metrics));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeByteArray(out, metrics);
+
+ out.writeInt(clientMetrics != null ? clientMetrics.size() : -1);
+
+ if (clientMetrics != null) {
+ for (byte[] arr : clientMetrics)
+ U.writeByteArray(out, arr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ metrics = U.readByteArray(in);
+
+ int clientMetricsSize = in.readInt();
+
+ if (clientMetricsSize >= 0) {
+ clientMetrics = new ArrayList<>(clientMetricsSize);
+
+ for (int i = 0; i < clientMetricsSize; i++)
+ clientMetrics.add(U.readByteArray(in));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
new file mode 100644
index 0000000..2546aeb
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -0,0 +1,102 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import org.gridgain.grid.spi.discovery.tcp.internal.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Initial message sent by a node that wants to enter topology.
+ * Sent to random node during SPI start. Then forwarded directly to coordinator.
+ */
+public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** New node that wants to join the topology. */
+ private TcpDiscoveryNode node;
+
+ /** Discovery data. */
+ private List<Object> discoData;
+
+ /**
+ * Public default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryJoinRequestMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param node New node that wants to join.
+ * @param discoData Discovery data.
+ */
+ public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node, List<Object> discoData) {
+ super(node.id());
+
+ this.node = node;
+ this.discoData = discoData;
+ }
+
+ /**
+ * Gets new node that wants to join the topology.
+ *
+ * @return Node that wants to join the topology.
+ */
+ public TcpDiscoveryNode node() {
+ return node;
+ }
+
+ /**
+ * @return Discovery data.
+ */
+ public List<Object> discoveryData() {
+ return discoData;
+ }
+
+ /**
+ * @return {@code true} flag.
+ */
+ public boolean responded() {
+ return getFlag(RESPONDED_FLAG_POS);
+ }
+
+ /**
+ * @param responded Responded flag.
+ */
+ public void responded(boolean responded) {
+ setFlag(RESPONDED_FLAG_POS, responded);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ out.writeObject(node);
+ U.writeCollection(out, discoData);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ node = (TcpDiscoveryNode)in.readObject();
+ discoData = U.readList(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryJoinRequestMessage.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryLoopbackProblemMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryLoopbackProblemMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryLoopbackProblemMessage.java
new file mode 100644
index 0000000..fce327b
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryLoopbackProblemMessage.java
@@ -0,0 +1,87 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Message telling joining node that it has loopback problem (misconfiguration).
+ * This means that remote node is configured to use loopback address, but joining node is not, or vise versa.
+ */
+public class TcpDiscoveryLoopbackProblemMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Remote node addresses. */
+ private Collection<String> addrs;
+
+ /** Remote node host names. */
+ private Collection<String> hostNames;
+
+ /**
+ * Public default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryLoopbackProblemMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param creatorNodeId Creator node ID.
+ * @param addrs Remote node addresses.
+ * @param hostNames Remote node host names.
+ */
+ public TcpDiscoveryLoopbackProblemMessage(UUID creatorNodeId, Collection<String> addrs,
+ Collection<String> hostNames) {
+ super(creatorNodeId);
+
+ this.addrs = addrs;
+ this.hostNames = hostNames;
+ }
+
+ /**
+ * @return Remote node addresses.
+ */
+ public Collection<String> addresses() {
+ return addrs;
+ }
+
+ /**
+ * @return Remote node host names.
+ */
+ public Collection<String> hostNames() {
+ return hostNames;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ U.writeCollection(out, addrs);
+ U.writeCollection(out, hostNames);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ addrs = U.readCollection(in);
+ hostNames = U.readCollection(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryLoopbackProblemMessage.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
new file mode 100644
index 0000000..ddb4ae7
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -0,0 +1,75 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Sent by coordinator across the ring to finish node add process.
+ */
+@TcpDiscoveryEnsureDelivery
+@TcpDiscoveryRedirectToClient
+public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Added node ID. */
+ private UUID nodeId;
+
+ /**
+ * Public default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryNodeAddFinishedMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param creatorNodeId ID of the creator node (coordinator).
+ * @param nodeId Added node ID.
+ */
+ public TcpDiscoveryNodeAddFinishedMessage(UUID creatorNodeId, UUID nodeId) {
+ super(creatorNodeId);
+
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * Gets ID of the node added.
+ *
+ * @return ID of the node added.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ U.writeUuid(out, nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ nodeId = U.readUuid(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
new file mode 100644
index 0000000..0e9317f
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -0,0 +1,246 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.spi.discovery.tcp.internal.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.tostring.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Message telling nodes that new node should be added to topology.
+ * When newly added node receives the message it connects to its next and finishes
+ * join process.
+ */
+@TcpDiscoveryEnsureDelivery
+@TcpDiscoveryRedirectToClient
+public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Added node. */
+ private TcpDiscoveryNode node;
+
+ /** Pending messages from previous node. */
+ private Collection<TcpDiscoveryAbstractMessage> msgs;
+
+ /** Discarded message ID. */
+ private IgniteUuid discardMsgId;
+
+ /** Current topology. Initialized by coordinator. */
+ @GridToStringInclude
+ private Collection<TcpDiscoveryNode> top;
+
+ /** Topology snapshots history. */
+ private Map<Long, Collection<ClusterNode>> topHist;
+
+ /** Discovery data from new node. */
+ private List<Object> newNodeDiscoData;
+
+ /** Discovery data from old nodes. */
+ private Collection<List<Object>> oldNodesDiscoData;
+
+ /** Start time of the first grid node. */
+ private long gridStartTime;
+
+ /**
+ * Public default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryNodeAddedMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param creatorNodeId Creator node ID.
+ * @param node Node to add to topology.
+ * @param newNodeDiscoData New Node discovery data.
+ * @param gridStartTime Start time of the first grid node.
+ */
+ public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId, TcpDiscoveryNode node,
+ List<Object> newNodeDiscoData, long gridStartTime) {
+ super(creatorNodeId);
+
+ assert node != null;
+ assert gridStartTime > 0;
+
+ this.node = node;
+ this.newNodeDiscoData = newNodeDiscoData;
+ this.gridStartTime = gridStartTime;
+
+ oldNodesDiscoData = new LinkedList<>();
+ }
+
+ /**
+ * Gets newly added node.
+ *
+ * @return New node.
+ */
+ public TcpDiscoveryNode node() {
+ return node;
+ }
+
+ /**
+ * Gets pending messages sent to new node by its previous.
+ *
+ * @return Pending messages from previous node.
+ */
+ @Nullable public Collection<TcpDiscoveryAbstractMessage> messages() {
+ return msgs;
+ }
+
+ /**
+ * Gets discarded message ID.
+ *
+ * @return Discarded message ID.
+ */
+ @Nullable public IgniteUuid discardedMessageId() {
+ return discardMsgId;
+ }
+
+ /**
+ * Sets pending messages to send to new node.
+ *
+ * @param msgs Pending messages to send to new node.
+ * @param discardMsgId Discarded message ID.
+ */
+ public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+ this.msgs = msgs;
+ this.discardMsgId = discardMsgId;
+ }
+
+ /**
+ * Gets topology.
+ *
+ * @return Current topology.
+ */
+ @Nullable public Collection<TcpDiscoveryNode> topology() {
+ return top;
+ }
+
+ /**
+ * Sets topology.
+ *
+ * @param top Current topology.
+ */
+ public void topology(@Nullable Collection<TcpDiscoveryNode> top) {
+ this.top = top;
+ }
+
+ /**
+ * Gets topology snapshots history.
+ *
+ * @return Map with topology snapshots history.
+ */
+ @Nullable public Map<Long, Collection<ClusterNode>> topologyHistory() {
+ return topHist;
+ }
+
+ /**
+ * Sets topology snapshots history.
+ *
+ * @param topHist Map with topology snapshots history.
+ */
+ public void topologyHistory(@Nullable Map<Long, Collection<ClusterNode>> topHist) {
+ this.topHist = topHist;
+ }
+
+ /**
+ * @return Discovery data from new node.
+ */
+ public List<Object> newNodeDiscoveryData() {
+ return newNodeDiscoData;
+ }
+
+ /**
+ * @return Discovery data from old nodes.
+ */
+ public Collection<List<Object>> oldNodesDiscoveryData() {
+ return oldNodesDiscoData;
+ }
+
+ /**
+ * @param discoData Discovery data to add.
+ */
+ public void addDiscoveryData(List<Object> discoData) {
+ // Old nodes disco data may be null if message
+ // makes more than 1 pass due to stopping of the nodes in topology.
+ if (oldNodesDiscoData != null)
+ oldNodesDiscoData.add(discoData);
+ }
+
+ /**
+ * Clears discovery data to minimize message size.
+ */
+ public void clearDiscoveryData() {
+ newNodeDiscoData = null;
+ oldNodesDiscoData = null;
+ }
+
+ /**
+ * @return First grid node start time.
+ */
+ public long gridStartTime() {
+ return gridStartTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ out.writeObject(node);
+ U.writeCollection(out, msgs);
+ U.writeGridUuid(out, discardMsgId);
+ U.writeCollection(out, top);
+ U.writeMap(out, topHist);
+ out.writeLong(gridStartTime);
+ U.writeCollection(out, newNodeDiscoData);
+
+ out.writeInt(oldNodesDiscoData != null ? oldNodesDiscoData.size() : -1);
+
+ if (oldNodesDiscoData != null) {
+ for (List<Object> list : oldNodesDiscoData)
+ U.writeCollection(out, list);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ node = (TcpDiscoveryNode)in.readObject();
+ msgs = U.readCollection(in);
+ discardMsgId = U.readGridUuid(in);
+ top = U.readCollection(in);
+ topHist = U.readTreeMap(in);
+ gridStartTime = in.readLong();
+ newNodeDiscoData = U.readList(in);
+
+ int oldNodesDiscoDataSize = in.readInt();
+
+ if (oldNodesDiscoDataSize >= 0) {
+ oldNodesDiscoData = new ArrayList<>(oldNodesDiscoDataSize);
+
+ for (int i = 0; i < oldNodesDiscoDataSize; i++)
+ oldNodesDiscoData.add(U.readList(in));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryNodeAddedMessage.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
new file mode 100644
index 0000000..8a40122
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
@@ -0,0 +1,93 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Sent by node that has detected node failure to coordinator across the ring,
+ * then sent by coordinator across the ring.
+ */
+@TcpDiscoveryEnsureDelivery
+@TcpDiscoveryRedirectToClient
+public class TcpDiscoveryNodeFailedMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** ID of the failed node. */
+ private UUID failedNodeId;
+
+ /** Internal order of the failed node. */
+ private long order;
+
+ /**
+ * Public default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryNodeFailedMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param creatorNodeId ID of the node that detects nodes failure.
+ * @param failedNodeId ID of the failed nodes.
+ * @param order Order of the failed node.
+ */
+ public TcpDiscoveryNodeFailedMessage(UUID creatorNodeId, UUID failedNodeId, long order) {
+ super(creatorNodeId);
+
+ assert failedNodeId != null;
+ assert order > 0;
+
+ this.failedNodeId = failedNodeId;
+ this.order = order;
+ }
+
+ /**
+ * Gets ID of the failed node.
+ *
+ * @return ID of the failed node.
+ */
+ public UUID failedNodeId() {
+ return failedNodeId;
+ }
+
+ /**
+ * @return Internal order of the failed node.
+ */
+ public long order() {
+ return order;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ U.writeUuid(out, failedNodeId);
+ out.writeLong(order);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ failedNodeId = U.readUuid(in);
+ order = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryNodeFailedMessage.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java
new file mode 100644
index 0000000..2e49505
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java
@@ -0,0 +1,47 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Sent by node that is stopping to coordinator across the ring,
+ * then sent by coordinator across the ring.
+ */
+@TcpDiscoveryEnsureDelivery
+@TcpDiscoveryRedirectToClient
+public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Public default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryNodeLeftMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param creatorNodeId ID of the node that is about to leave the topology.
+ */
+ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) {
+ super(creatorNodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
new file mode 100644
index 0000000..e2e5f51
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java
@@ -0,0 +1,65 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Ping request.
+ */
+public class TcpDiscoveryPingRequest extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Pinged client node ID. */
+ private UUID clientNodeId;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public TcpDiscoveryPingRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param creatorNodeId Creator node ID.
+ * @param clientNodeId Pinged client node ID.
+ */
+ public TcpDiscoveryPingRequest(UUID creatorNodeId, @Nullable UUID clientNodeId) {
+ super(creatorNodeId);
+
+ this.clientNodeId = clientNodeId;
+ }
+
+ /**
+ * @return Pinged client node ID.
+ */
+ @Nullable public UUID clientNodeId() {
+ return clientNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ U.writeUuid(out, clientNodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ clientNodeId = U.readUuid(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
new file mode 100644
index 0000000..18a5be1
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java
@@ -0,0 +1,66 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Ping response.
+ */
+public class TcpDiscoveryPingResponse extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Whether pinged client exists. */
+ private boolean clientExists;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public TcpDiscoveryPingResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param creatorNodeId Creator node ID.
+ */
+ public TcpDiscoveryPingResponse(UUID creatorNodeId) {
+ super(creatorNodeId);
+ }
+
+ /**
+ * @param clientExists Whether pinged client exists.
+ */
+ public void clientExists(boolean clientExists) {
+ this.clientExists = clientExists;
+ }
+
+ /**
+ * @return Whether pinged client exists.
+ */
+ public boolean clientExists() {
+ return clientExists;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ out.writeBoolean(clientExists);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ clientExists = in.readBoolean();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryRedirectToClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryRedirectToClient.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryRedirectToClient.java
new file mode 100644
index 0000000..972067b
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryRedirectToClient.java
@@ -0,0 +1,23 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import java.lang.annotation.*;
+
+/**
+ * Message classes with this annotation attached will be
+ * redirected to client nodes when going through ring.
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TcpDiscoveryRedirectToClient {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
new file mode 100644
index 0000000..3481488
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
@@ -0,0 +1,123 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.spi.discovery.tcp.messages;
+
+import org.gridgain.grid.spi.discovery.tcp.internal.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Message sent by node to its next to ensure that next node and
+ * connection to it are alive. Receiving node should send it across the ring,
+ * until message does not reach coordinator. Coordinator responds directly to node.
+ * <p>
+ * If a failed node id is specified then the message is sent across the ring up to the sender node
+ * to ensure that the failed node is actually failed.
+ */
+public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Status OK. */
+ public static final int STATUS_OK = 1;
+
+ /** Status RECONNECT. */
+ public static final int STATUS_RECON = 2;
+
+ /** Creator node. */
+ private TcpDiscoveryNode creatorNode;
+
+ /** Failed node id. */
+ private UUID failedNodeId;
+
+ /** Creator node status (initialized by coordinator). */
+ private int status;
+
+ /**
+ * Public default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryStatusCheckMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param creatorNode Creator node.
+ * @param failedNodeId Failed node id.
+ */
+ public TcpDiscoveryStatusCheckMessage(TcpDiscoveryNode creatorNode, UUID failedNodeId) {
+ super(creatorNode.id());
+
+ this.creatorNode = creatorNode;
+ this.failedNodeId = failedNodeId;
+ }
+
+ /**
+ * Gets creator node.
+ *
+ * @return Creator node.
+ */
+ public TcpDiscoveryNode creatorNode() {
+ return creatorNode;
+ }
+
+ /**
+ * Gets failed node id.
+ *
+ * @return Failed node id.
+ */
+ public UUID failedNodeId() {
+ return failedNodeId;
+ }
+
+ /**
+ * Gets creator status.
+ *
+ * @return Creator node status.
+ */
+ public int status() {
+ return status;
+ }
+
+ /**
+ * Sets creator node status (should be set by coordinator).
+ *
+ * @param status Creator node status.
+ */
+ public void status(int status) {
+ this.status = status;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ out.writeObject(creatorNode);
+ U.writeUuid(out, failedNodeId);
+ out.writeInt(status);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ creatorNode = (TcpDiscoveryNode)in.readObject();
+ failedNodeId = U.readUuid(in);
+ status = in.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryStatusCheckMessage.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cdb10bc1/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java
index bd51479..cfb7b92 100644
--- a/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/spi/discovery/tcp/GridTcpDiscoverySelfTest.java
@@ -958,7 +958,7 @@ public class GridTcpDiscoverySelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override void onBeforeMessageSentAcrossRing(Serializable msg) {
- if (msg instanceof GridTcpDiscoveryNodeAddedMessage)
+ if (msg instanceof TcpDiscoveryNodeAddedMessage)
if (++i == 2) {
simulateNodeFailure();
@@ -973,7 +973,7 @@ public class GridTcpDiscoverySelfTest extends GridCommonAbstractTest {
private static class FailBeforeNodeLeftSentSpi extends TcpDiscoverySpi {
/** {@inheritDoc} */
@Override void onBeforeMessageSentAcrossRing(Serializable msg) {
- if (msg instanceof GridTcpDiscoveryNodeLeftMessage) {
+ if (msg instanceof TcpDiscoveryNodeLeftMessage) {
simulateNodeFailure();
throw new RuntimeException("Avoid message sending: " + msg.getClass());