You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/17 21:00:27 UTC

incubator-ignite git commit: ignite-752: implemented connection check message

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-752 6bb1875fd -> 347eb70c8


ignite-752: implemented connection check message


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/347eb70c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/347eb70c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/347eb70c

Branch: refs/heads/ignite-752
Commit: 347eb70c8d95b8922bc1a79a6dc9e791b56c9ba5
Parents: 6bb1875
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Jul 17 21:59:04 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Jul 17 21:59:04 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 190 +++++++++++++++++--
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  10 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  40 +++-
 .../tcp/internal/TcpDiscoveryNode.java          |  19 ++
 .../TcpDiscoveryConnectionCheckMessage.java     |  45 +++++
 5 files changed, 291 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/347eb70c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3a50c31..74c7dbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -88,6 +88,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private CheckStatusSender chkStatusSnd;
 
+    /** Connection checker. */
+    private CheckConnectionWorker chkConnWorker;
+
     /** IP finder cleaner. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private IpFinderCleaner ipFinderCleaner;
@@ -232,8 +235,14 @@ class ServerImpl extends TcpDiscoveryImpl {
         hbsSnd = new HeartbeatsSender();
         hbsSnd.start();
 
-        chkStatusSnd = new CheckStatusSender();
-        chkStatusSnd.start();
+        if (spi.failureDetectionThresholdEnabled()) {
+            chkConnWorker = new CheckConnectionWorker();
+            chkConnWorker.start();
+        }
+        else {
+            chkStatusSnd = new CheckStatusSender();
+            chkStatusSnd.start();
+        }
 
         if (spi.ipFinder.isShared()) {
             ipFinderCleaner = new IpFinderCleaner();
@@ -323,8 +332,14 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(hbsSnd);
         U.join(hbsSnd, log);
 
-        U.interrupt(chkStatusSnd);
-        U.join(chkStatusSnd, log);
+        if (spi.failureDetectionThresholdEnabled()) {
+            U.interrupt(chkConnWorker);
+            U.join(chkConnWorker, log);
+        }
+        else {
+            U.interrupt(chkStatusSnd);
+            U.join(chkStatusSnd, log);
+        }
 
         U.interrupt(ipFinderCleaner);
         U.join(ipFinderCleaner, log);
@@ -619,6 +634,14 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override protected void onDataRead() {
+        if (spi.failureDetectionThresholdEnabled()) {
+            locNode.lastDataReceivedTime(U.currentTimeMillis());
+            chkConnWorker.reset();
+        }
+    }
+
     /**
      * Tries to join this node to topology.
      *
@@ -1287,8 +1310,14 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(hbsSnd);
         U.join(hbsSnd, log);
 
-        U.interrupt(chkStatusSnd);
-        U.join(chkStatusSnd, log);
+        if (spi.failureDetectionThresholdEnabled()) {
+            U.interrupt(chkConnWorker);
+            U.join(chkConnWorker, log);
+        }
+        else {
+            U.interrupt(chkStatusSnd);
+            U.join(chkStatusSnd, log);
+        }
 
         U.interrupt(ipFinderCleaner);
         U.join(ipFinderCleaner, log);
@@ -1378,7 +1407,12 @@ class ServerImpl extends TcpDiscoveryImpl {
             b.append("Internal threads: ").append(U.nl());
 
             b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
-            b.append("    Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
+
+            if (spi.failureDetectionThresholdEnabled())
+                b.append("    Check connectino worker: ").append(threadStatus(chkConnWorker)).append(U.nl());
+            else
+                b.append("    Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
+
             b.append("    HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
             b.append("    IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
             b.append("    Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
@@ -1426,7 +1460,8 @@ class ServerImpl extends TcpDiscoveryImpl {
     private boolean recordable(TcpDiscoveryAbstractMessage msg) {
         return !(msg instanceof TcpDiscoveryHeartbeatMessage) &&
             !(msg instanceof TcpDiscoveryStatusCheckMessage) &&
-            !(msg instanceof TcpDiscoveryDiscardMessage);
+            !(msg instanceof TcpDiscoveryDiscardMessage) &&
+            !(msg instanceof TcpDiscoveryConnectionCheckMessage);
     }
 
     /**
@@ -1568,6 +1603,82 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     * TODO: IGNITE-752
+     */
+    private class CheckConnectionWorker extends IgniteSpiThread {
+        /** */
+        private volatile boolean msgInQueue;
+
+        /** */
+        private volatile boolean logMsgPrinted;
+
+        /**
+         * Constructor
+         */
+        public CheckConnectionWorker() {
+            super(spi.ignite().name(), "tcp-disco-conn-check-worker", log);
+
+            setPriority(spi.threadPri);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("Connection check worker has been started.");
+
+            while (!isInterrupted()) {
+                if (spiStateCopy() != CONNECTED) {
+                    if (log.isDebugEnabled())
+                        log.debug("Stopping connection check worker (SPI is not connected to topology).");
+
+                    return;
+                }
+
+                if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() &&
+                    ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+
+                    if (!logMsgPrinted) {
+                        log.info("Local node seems to be disconnected from topology (failure detection threshold " +
+                            "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
+                            ", connCheckFreq=" + spi.connCheckFreq + ']');
+
+                        logMsgPrinted = true;
+                    }
+                }
+
+                if (msgInQueue) {
+                    Thread.sleep(spi.connCheckFreq);
+
+                    continue;
+                }
+
+                if (ring.hasRemoteNodes()) {
+                    // Send the message using ring message worker in order to reuse an existed socket to the next node.
+                    msgInQueue = true;
+
+                    msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode));
+                }
+
+                Thread.sleep(spi.connCheckFreq);
+            }
+        }
+
+        /**
+         * TODO: IGNITE-752
+         */
+        private void reset() {
+            logMsgPrinted = false;
+        }
+
+        /**
+         * TODO: IGNITE-752
+         */
+        private void messageProcessed() {
+            msgInQueue = false;
+        }
+    }
+
+    /**
      * Thread that cleans IP finder and keeps it in the correct state, unregistering
      * addresses of the nodes that has left the topology.
      * <p>
@@ -1910,6 +2021,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (msg instanceof TcpDiscoveryJoinRequestMessage)
                 processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
 
+            else if (msg instanceof TcpDiscoveryConnectionCheckMessage)
+                processConnectionCheckMessage((TcpDiscoveryConnectionCheckMessage)msg);
+
             else if (msg instanceof TcpDiscoveryClientReconnectMessage)
                 processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
 
@@ -1956,9 +2070,10 @@ class ServerImpl extends TcpDiscoveryImpl {
          * Sends message across the ring.
          *
          * @param msg Message to send
+         * @return Response code.
          */
         @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"})
-        private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
+        private int sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
             assert msg != null;
 
             assert ring.hasRemoteNodes();
@@ -2007,6 +2122,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             UUID locNodeId = getLocalNodeId();
 
+            int msgRes = RES_FAIL;
+
             while (true) {
                 if (searchNext) {
                     TcpDiscoveryNode newNext = ring.nextNode(failedNodes);
@@ -2272,6 +2389,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 }
                             }
 
+                            if (msg instanceof TcpDiscoveryConnectionCheckMessage && next.version().greaterThanEqual(
+                                    TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+                                    TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER,
+                                    TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) {
+                                // Preserve backward compatibility with nodes of older versions.
+                                assert msg.creatorNodeId().equals(getLocalNodeId());
+
+                                msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
+                            }
+
                             prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
 
                             try {
@@ -2284,17 +2411,17 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
 
-                                int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
+                                msgRes = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
 
                                 if (log.isDebugEnabled())
                                     log.debug("Message has been sent to next node [msg=" + msg +
                                         ", next=" + next.id() +
-                                        ", res=" + res + ']');
+                                        ", res=" + msgRes + ']');
 
                                 if (debugMode)
                                     debugLog("Message has been sent to next node [msg=" + msg +
                                         ", next=" + next.id() +
-                                        ", res=" + res + ']');
+                                        ", res=" + msgRes + ']');
                             }
                             finally {
                                 clearNodeAddedMessage(msg);
@@ -2421,6 +2548,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         "To speed up failure detection please see 'Failure Detection' section under javadoc" +
                         " for 'TcpDiscoverySpi'");
             }
+
+            return msgRes;
         }
 
         /**
@@ -3847,6 +3976,35 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * TODO: IGNITE-752
+         * @param msg
+         */
+        private void processConnectionCheckMessage(TcpDiscoveryConnectionCheckMessage msg) {
+            assert msg.creatorNodeId().equals(getLocalNodeId()) && msg.senderNodeId() == null;
+
+            if (spiStateCopy() != CONNECTED) {
+                if (log.isDebugEnabled())
+                    log.debug("Connection check message discarded (local node receives updates).");
+
+                chkConnWorker.messageProcessed();
+                return;
+            }
+
+            int res = RES_FAIL;
+
+            if (ring.hasRemoteNodes())
+                res = sendMessageAcrossRing(msg);
+
+            chkConnWorker.messageProcessed();
+
+            if (res == TcpDiscoveryConnectionCheckMessage.STATUS_RECON) {
+                U.warn(log, "Node is out of topology (probably, due to short-time network problems).");
+
+                notifyDiscovery(EVT_NODE_SEGMENTED, ring.topologyVersion(), locNode);
+            }
+        }
+
+        /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
          * @param cacheMetrics Cache metrics.
@@ -4261,6 +4419,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         return;
                     }
 
+
+
                     // Handshake.
                     TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
 
@@ -4418,6 +4578,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     break;
                             }
                         }
+                        else if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+                            spi.writeToSocket(msg, sock, ring.node(msg.creatorNodeId()) != null ? RES_OK :
+                                TcpDiscoveryConnectionCheckMessage.STATUS_RECON, socketTimeout);
+
+                            continue;
+                        }
                         else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
                             if (clientMsgWrk != null) {
                                 TcpDiscoverySpiState state = spiStateCopy();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/347eb70c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index cf113a2..85b1f38 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -37,6 +37,9 @@ abstract class TcpDiscoveryImpl {
     /** Response OK. */
     protected static final int RES_OK = 1;
 
+    /** Response FAIL. */
+    protected static final int RES_FAIL = -1;
+
     /** Response CONTINUE JOIN. */
     protected static final int RES_CONTINUE_JOIN = 100;
 
@@ -131,6 +134,13 @@ abstract class TcpDiscoveryImpl {
     }
 
     /**
+     * TODO: IGNITE-752
+     */
+    protected void onDataRead() {
+        // No-op
+    }
+
+    /**
      * @param log Logger.
      */
     public abstract void dumpDebugInfo(IgniteLogger log);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/347eb70c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 8d99bcd..fcba8c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -157,6 +157,15 @@ import java.util.concurrent.atomic.*;
 @DiscoverySpiOrderSupport(true)
 @DiscoverySpiHistorySupport(true)
 public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
+    /** Failure detection threshold feature major version. */
+    final static int FAILURE_DETECTION_MAJOR_VER = 1;
+
+    /** Failure detection threshold feature minor version. */
+    final static int FAILURE_DETECTION_MINOR_VER = 3;
+
+    /** Failure detection threshold feature maintainance version. */
+    final static int FAILURE_DETECTION_MAINT_VER = 1;
+
     /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
     public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
 
@@ -202,6 +211,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */
     public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5;
 
+    /** Default connection check frequency. */
+    public static final int DFLT_CONN_CHECK_FREQ = 1000;
+
     /** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */
     public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000;
 
@@ -242,6 +254,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Size of topology snapshots history. */
     protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
 
+    /** Connection check frequency. Used in conjunction with failure detection threshold. */
+    protected long connCheckFreq = DFLT_CONN_CHECK_FREQ;
+
     /** Grid discovery listener. */
     protected volatile DiscoverySpiListener lsnr;
 
@@ -828,6 +843,19 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /**
+     * TODO: IGNITE-752
+     *
+     * @param connCheckFreq
+     * @return
+     */
+    @IgniteSpiConfiguration(optional =  true)
+    public TcpDiscoverySpi setConnectionCheckFrequency(long connCheckFreq) {
+        this.connCheckFreq = connCheckFreq;
+
+        return this;
+    }
+
+    /**
      * @return Size of topology snapshots history.
      */
     public long getTopHistorySize() {
@@ -1332,7 +1360,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         try {
             sock.setSoTimeout((int)timeout);
 
-            return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
+            T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
+
+            impl.onDataRead();
+
+            return res;
         }
         catch (IOException | IgniteCheckedException e) {
             if (X.hasCause(e, SocketTimeoutException.class))
@@ -1373,6 +1405,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             if (res == -1)
                 throw new EOFException();
 
+            impl.onDataRead();
+
             return res;
         }
         catch (SocketTimeoutException e) {
@@ -1648,6 +1682,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             log.debug(configInfo("threadPri", threadPri));
 
             if (!failureDetectionThresholdEnabled()) {
+                log.debug("Failure detection threshold is disabled and connection check frequency is ignored because " +
+                    "at least one of the parameters from this list has been set manually: 'networkTimeout'," +
+                    " 'sockTimeout', 'ackTimeout', 'maxAckTimeout', 'reconnectCount'.");
+
                 log.debug(configInfo("networkTimeout", netTimeout));
                 log.debug(configInfo("sockTimeout", sockTimeout));
                 log.debug(configInfo("ackTimeout", ackTimeout));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/347eb70c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 032cf01..b33c0c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -89,6 +89,9 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
     @GridToStringExclude
     private volatile long lastUpdateTime = U.currentTimeMillis();
 
+    /** The most recent time when a data chunk was received from a node. */
+    private volatile long lastDataReceivedTime = U.currentTimeMillis();
+
     /** Metrics provider (transient). */
     @GridToStringExclude
     private DiscoveryMetricsProvider metricsProvider;
@@ -385,6 +388,22 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
     }
 
     /**
+     * TODO: IGNITE-752
+     * @return
+     */
+    public long lastDataReceivedTime() {
+        return lastDataReceivedTime;
+    }
+
+    /**
+     * TODO: IGNITE-752
+     * @param lastDataReceivedTime
+     */
+    public void lastDataReceivedTime(long lastDataReceivedTime) {
+        this.lastDataReceivedTime = lastDataReceivedTime;
+    }
+
+    /**
      * Gets visible flag.
      *
      * @return {@code true} if node is in visible state.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/347eb70c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
new file mode 100644
index 0000000..3249220
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+
+/**
+ * Message used to check whether a node is still connected to the topology.
+ * The difference from {@link TcpDiscoveryStatusCheckMessage} is that this message is sent to the next node
+ * which directly replies to the sender without message re-translation to the coordinator.
+ */
+public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage {
+    /** Status RECONNECT. */
+    public static final int STATUS_RECON = 500;
+
+    /**
+     * Constructor.
+     *
+     * @param creatorNode Node created this message.
+     */
+    public TcpDiscoveryConnectionCheckMessage(TcpDiscoveryNode creatorNode) {
+        super(creatorNode.id());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TcpDiscoveryConnectionCheckMessage.class, this, "super", super.toString());
+    }
+}