You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/21 16:35:36 UTC

incubator-ignite git commit: ignite-752: removed heartbeats sender and status checker threads

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-752 9319206bd -> c453ab8dc


ignite-752: removed heartbeats sender and status checker threads


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c453ab8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c453ab8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c453ab8d

Branch: refs/heads/ignite-752
Commit: c453ab8dc19327ac8b7df8350bd65ed0e237c3ac
Parents: 9319206
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Jul 21 17:35:23 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Jul 21 17:35:23 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 423 +++++--------------
 ...TcpDiscoverySpiFailureThresholdSelfTest.java |   5 +-
 2 files changed, 114 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c453ab8d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 02e13ba..2a09c62 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -80,14 +80,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Client message workers. */
     protected ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
 
-    /** Metrics sender. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private HeartbeatsSender hbsSnd;
-
-    /** Status checker. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private CheckStatusSender chkStatusSnd;
-
     /** IP finder cleaner. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private IpFinderCleaner ipFinderCleaner;
@@ -229,12 +221,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         spi.stats.onJoinFinished();
 
-        hbsSnd = new HeartbeatsSender();
-        hbsSnd.start();
-
-        chkStatusSnd = spi.failureDetectionThresholdEnabled() ? new CheckConnectionWorker() : new CheckStatusSender();
-        chkStatusSnd.start();
-
         if (spi.ipFinder.isShared()) {
             ipFinderCleaner = new IpFinderCleaner();
             ipFinderCleaner.start();
@@ -319,12 +305,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(tmp);
         U.joinThreads(tmp, log);
 
-        U.interrupt(hbsSnd);
-        U.join(hbsSnd, log);
-
-        U.interrupt(chkStatusSnd);
-        U.join(chkStatusSnd, log);
-
         U.interrupt(ipFinderCleaner);
         U.join(ipFinderCleaner, log);
 
@@ -624,8 +604,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (locNode != null)
                 locNode.lastDataReceivedTime(U.currentTimeMillis());
 
-            if (chkStatusSnd != null)
-                chkStatusSnd.onDataReceived();
+            if (msgWorker != null)
+                // Node receives messages from remote nodes, reset this flag.
+                msgWorker.failureDetectionNotified = false;
         }
     }
 
@@ -1291,12 +1272,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(tcpSrvr);
         U.join(tcpSrvr, log);
 
-        U.interrupt(hbsSnd);
-        U.join(hbsSnd, log);
-
-        U.interrupt(chkStatusSnd);
-        U.join(chkStatusSnd, log);
-
         U.interrupt(ipFinderCleaner);
         U.join(ipFinderCleaner, log);
 
@@ -1386,12 +1361,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
 
-            if (spi.failureDetectionThresholdEnabled())
-                b.append("    Check connection worker: ").append(threadStatus(chkStatusSnd)).append(U.nl());
-            else
-                b.append("    Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
-
-            b.append("    HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
             b.append("    IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
             b.append("    Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
 
@@ -1475,261 +1444,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
-     * Thread that sends heartbeats.
-     */
-    private class HeartbeatsSender extends IgniteSpiThread {
-        /**
-         * Constructor.
-         */
-        private HeartbeatsSender() {
-            super(spi.ignite().name(), "tcp-disco-hb-sender", log);
-
-            setPriority(spi.threadPri);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("BusyWait")
-        @Override protected void body() throws InterruptedException {
-            while (!isLocalNodeCoordinator())
-                Thread.sleep(1000);
-
-            if (log.isDebugEnabled())
-                log.debug("Heartbeats sender has been started.");
-
-            UUID nodeId = getConfiguredNodeId();
-
-            while (!isInterrupted()) {
-                if (spiStateCopy() != CONNECTED) {
-                    if (log.isDebugEnabled())
-                        log.debug("Stopping heartbeats sender (SPI is not connected to topology).");
-
-                    return;
-                }
-
-                TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId);
-
-                msg.verify(getLocalNodeId());
-
-                msgWorker.addMessage(msg);
-
-                Thread.sleep(spi.hbFreq);
-            }
-        }
-    }
-
-    /**
-     * Thread that sends status check messages to next node if local node has not
-     * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
-     * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
-     * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
-     */
-    private class CheckStatusSender extends IgniteSpiThread {
-        /** Heartbeats check timeout. */
-        protected long hbCheckTimeout;
-
-        /** Time when the last status message has been sent. */
-        protected long lastTimeStatusMsgSent;
-
-        /**
-         * Constructor.
-         */
-        private CheckStatusSender() {
-            super(spi.ignite().name(), "tcp-disco-status-check-sender", log);
-
-            init();
-        }
-
-        /**
-         * Constructor.
-         *
-         * @param threadName Name of the thread.
-         */
-        private CheckStatusSender(String threadName) {
-            super(spi.ignite().name(), threadName, log);
-
-            init();
-        }
-
-        /**
-         * Inits {@code CheckStatusSender}.
-         */
-        private void init() {
-            // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
-            hbCheckTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
-
-            setPriority(spi.threadPri);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("BusyWait")
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Status check sender has been started.");
-
-            while (!isInterrupted()) {
-                if (spiStateCopy() != CONNECTED) {
-                    if (log.isDebugEnabled())
-                        log.debug("Stopping status check sender (SPI is not connected to topology).");
-
-                    return;
-                }
-
-                long hbTimeout = checkHeartbeats();
-
-                assert hbTimeout > 0;
-
-                if (hbTimeout > 0)
-                    Thread.sleep(hbTimeout);
-            }
-        }
-
-        /**
-         * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
-         * {@link TcpDiscoveryStatusCheckMessage} is sent accross the ring.
-         *
-         * @return Timeout to wait before calling this function again.
-         */
-        protected long checkHeartbeats() {
-            // 1. Determine timeout.
-            if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
-                lastTimeStatusMsgSent = locNode.lastUpdateTime();
-
-            long timeout = (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis();
-
-            // 2. Still need to wait before sending?
-            if (timeout > 0)
-                return timeout;
-
-            msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
-
-            // 3. Send status check message.
-            lastTimeStatusMsgSent = U.currentTimeMillis();
-
-            return (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis();
-        }
-
-        /**
-         * Called when a chunck of data is received from a remote node.
-         */
-        protected void onDataReceived() {
-            // No-op
-        }
-
-        /**
-         * Signals that a message added to the messages queue by this thread has been processed.
-         */
-        protected void messageProcessed() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    /**
-     * Thread performs the following two tasks:
-     * <ul>
-     * <li>
-     * Send connection check message to the next node with {@link TcpDiscoverySpi#connCheckFreq} frequency;
-     * </li>
-     * <li>
-     * Sends status check messages to next node if local node has not
-     * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
-     * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
-     * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
-     * </li>
-     * </ul>
-     */
-    private class CheckConnectionWorker extends CheckStatusSender {
-        /** */
-        private volatile boolean msgInQueue;
-
-        /** */
-        private volatile boolean logMsgPrinted;
-
-        /** Time when the last status message has been sent. */
-        private long lastTimeConnCheckMsgSent;
-
-        /**
-         * Constructor
-         */
-        public CheckConnectionWorker() {
-            super("tcp-disco-conn-check-worker");
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Connection check worker has been started.");
-
-            while (!isInterrupted()) {
-                if (spiStateCopy() != CONNECTED) {
-                    if (log.isDebugEnabled())
-                        log.debug("Stopping connection check worker (SPI is not connected to topology).");
-
-                    return;
-                }
-
-                long hbTimeout = checkHeartbeats();
-
-                assert hbTimeout > 0;
-
-                long connTimeout = checkConnection();
-
-                assert connTimeout > 0;
-
-                Thread.sleep(Math.min(hbTimeout, connTimeout));
-            }
-        }
-
-        /**
-         * Check connection aliveness status.
-         *
-         * @return Timeout to wait before caliing this method the next time.
-         */
-        private long checkConnection() {
-            if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() &&
-                ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
-
-                if (!logMsgPrinted) {
-                    log.info("Local node seems to be disconnected from topology (failure detection threshold " +
-                        "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
-                        ", connCheckFreq=" + spi.connCheckFreq + ']');
-
-                    logMsgPrinted = true;
-                }
-            }
-
-            if (msgInQueue)
-                return spi.connCheckFreq;
-
-            long timeout = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis();
-
-            if (timeout > 0)
-                return timeout;
-
-            if (ring.hasRemoteNodes()) {
-                // Send the message using ring message worker in order to reuse an existed socket to the next node.
-                msgInQueue = true;
-
-                msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode));
-
-                lastTimeConnCheckMsgSent = U.currentTimeMillis();
-            }
-
-            return spi.connCheckFreq;
-        }
-
-
-        /** {@inheritDoc} */
-        @Override protected void onDataReceived() {
-            logMsgPrinted = false;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void messageProcessed() {
-            msgInQueue = false;
-        }
-    }
-
-    /**
      * Thread that cleans IP finder and keeps it in the correct state, unregistering
      * addresses of the nodes that has left the topology.
      * <p>
@@ -2051,6 +1765,21 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Socket. */
         private Socket sock;
 
+        /** Last time status message has been sent. */
+        private long lastTimeStatusMsgSent;
+
+        /** Incoming heartbeats check frequency. */
+        private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50;
+
+        /** Time when the last status message has been sent. */
+        private long lastTimeConnCheckMsgSent;
+
+        /** Whether an error message has been printed out when failure detection threshold is reached. */
+        private volatile boolean failureDetectionNotified;
+
+        /** Last time hearbeat message has been sent. */
+        private long lastTimeHbMsgSent;
+
         /**
          */
         protected RingMessageWorker() {
@@ -2117,6 +1846,15 @@ class ServerImpl extends TcpDiscoveryImpl {
             spi.stats.onMessageProcessingFinished(msg);
         }
 
+        /** {@inheritDoc} */
+        @Override protected void noMessageLoop() {
+            checkConnection();
+
+            sendHeartbeatMessage();
+
+            checkHeartbeatsReceiving();
+        }
+
         /**
          * Sends message across the ring.
          *
@@ -4039,7 +3777,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (log.isDebugEnabled())
                     log.debug("Connection check message discarded (local node is leaving topology).");
 
-                chkStatusSnd.messageProcessed();
                 return;
             }
 
@@ -4047,27 +3784,19 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (log.isDebugEnabled())
                     log.debug("Connection check message discarded (no next node in topology).");
 
-                chkStatusSnd.messageProcessed();
                 return;
             }
 
-            try {
-                // Link to the 'next' node is updated only inside RingMessageWorker thread, no need to check on 'null'.
-                if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
-                    TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER,
-                    TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) {
-                    // Preserve backward compatibility with nodes of older versions.
-                    TcpDiscoveryStatusCheckMessage stMsg = new TcpDiscoveryStatusCheckMessage(locNode, null);
-                    stMsg.replacedConnCheckMsg(true);
-
-                    processStatusCheckMessage(stMsg);
-                }
-                else if (ring.hasRemoteNodes())
-                    sendMessageAcrossRing(msg);
-            }
-            finally {
-                chkStatusSnd.messageProcessed();
-            }
+            // Link to the 'next' node is updated only inside RingMessageWorker thread, no need to check on 'null'.
+            if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+                TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER, TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) {
+                // Preserve backward compatibility with nodes of older versions.
+                TcpDiscoveryStatusCheckMessage stMsg = new TcpDiscoveryStatusCheckMessage(locNode, null);
+                stMsg.replacedConnCheckMsg(true);
+
+                processStatusCheckMessage(stMsg);
+            } else if (ring.hasRemoteNodes())
+                sendMessageAcrossRing(msg);
         }
 
         /**
@@ -4272,6 +4001,71 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
         }
+
+        /**
+         * Sends heartbeat message if needed.
+         */
+        private void sendHeartbeatMessage() {
+            if (!isLocalNodeCoordinator())
+                return;
+
+            long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
+
+            if (elapsed > 0)
+                return;
+
+            TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
+
+            msg.verify(getLocalNodeId());
+
+            msgWorker.addMessage(msg);
+
+            lastTimeHbMsgSent = U.currentTimeMillis();
+        }
+
+        /**
+         * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
+         * {@link TcpDiscoveryStatusCheckMessage} is sent accros the ring.
+         */
+        private void checkHeartbeatsReceiving() {
+            if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
+                lastTimeStatusMsgSent = locNode.lastUpdateTime();
+
+            long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+
+            if (elapsed > 0)
+                return;
+
+            msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+
+            lastTimeStatusMsgSent = U.currentTimeMillis();
+        }
+
+        /**
+         * Check connection aliveness status.
+         */
+        private void checkConnection() {
+            if (!failureDetectionNotified && U.currentTimeMillis() - locNode.lastDataReceivedTime()
+                >= spi.failureDetectionThreshold() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+
+                log.info("Local node seems to be disconnected from topology (failure detection threshold " +
+                             "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
+                             ", connCheckFreq=" + spi.connCheckFreq + ']');
+
+                failureDetectionNotified = true;
+            }
+
+            long elapsed = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis();
+
+            if (elapsed > 0)
+                return;
+
+            if (ring.hasRemoteNodes()) {
+                processConnectionCheckMessage(new TcpDiscoveryConnectionCheckMessage(locNode));
+
+                lastTimeConnCheckMsgSent = U.currentTimeMillis();
+            }
+        }
     }
 
     /**
@@ -5219,10 +5013,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']');
 
             while (!isInterrupted()) {
-                TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
+                TcpDiscoveryAbstractMessage msg = queue.poll(10, TimeUnit.MILLISECONDS);
 
-                if (msg == null)
+                if (msg == null) {
+                    noMessageLoop();
                     continue;
+                }
 
                 processMessage(msg);
             }
@@ -5268,6 +5064,13 @@ class ServerImpl extends TcpDiscoveryImpl {
         protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
 
         /**
+         * Called when there is no message to process giving ability to perform other activity.
+         */
+        protected void noMessageLoop() {
+            // No-op.
+        }
+
+        /**
          * @param sock Socket.
          * @param msg Message.
          * @param timeout Socket timeout.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c453ab8d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
index 4fdf886..362be15 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
@@ -37,7 +37,7 @@ import java.net.*;
  */
 public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySelfTest {
     /** */
-    private static final int SPI_COUNT = 7;
+    private static final int SPI_COUNT = 6;
 
     /** */
     private static final long CONN_CHECK_FREQ = 2000;
@@ -76,9 +76,6 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
             case 5:
                 spi.setMaxAckTimeout(10000);
                 break;
-            case 6:
-                spi.setNetworkTimeout(4000);
-                break;
             default:
                 assert false;
         }