You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/19 12:33:20 UTC

[6/9] incubator-ignite git commit: ignite-752: optimized connection check message impl

ignite-752: optimized connection check message impl


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0cc31b27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0cc31b27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0cc31b27

Branch: refs/heads/ignite-752
Commit: 0cc31b27d449e18251e1d681a401f21365f8e529
Parents: 9878f40
Author: Denis Magda <dm...@gridgain.com>
Authored: Sun Jul 19 09:54:13 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Sun Jul 19 09:54:13 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 248 ++++++++++++-------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   2 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   6 +-
 .../TcpDiscoveryStatusCheckMessage.java         |  19 ++
 4 files changed, 179 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9e9921b..f05d027 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -88,9 +88,6 @@ class ServerImpl extends TcpDiscoveryImpl {
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private CheckStatusSender chkStatusSnd;
 
-    /** Connection checker. */
-    private CheckConnectionWorker chkConnWorker;
-
     /** IP finder cleaner. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private IpFinderCleaner ipFinderCleaner;
@@ -235,10 +232,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         hbsSnd = new HeartbeatsSender();
         hbsSnd.start();
 
-        chkConnWorker = new CheckConnectionWorker();
-        chkConnWorker.start();
-
-        chkStatusSnd = new CheckStatusSender();
+        chkStatusSnd = spi.failureDetectionThresholdEnabled() ? new CheckConnectionWorker() : new CheckStatusSender();
         chkStatusSnd.start();
 
         if (spi.ipFinder.isShared()) {
@@ -329,9 +323,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(hbsSnd);
         U.join(hbsSnd, log);
 
-        U.interrupt(chkConnWorker);
-        U.join(chkConnWorker, log);
-
         U.interrupt(chkStatusSnd);
         U.join(chkStatusSnd, log);
 
@@ -629,10 +620,10 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
-    @Override protected void onDataRead() {
+    @Override protected void onDataReceived() {
         if (spi.failureDetectionThresholdEnabled()) {
             locNode.lastDataReceivedTime(U.currentTimeMillis());
-            chkConnWorker.reset();
+            chkStatusSnd.onDataReceived();
         }
     }
 
@@ -1304,14 +1295,8 @@ class ServerImpl extends TcpDiscoveryImpl {
         U.interrupt(hbsSnd);
         U.join(hbsSnd, log);
 
-        if (spi.failureDetectionThresholdEnabled()) {
-            U.interrupt(chkConnWorker);
-            U.join(chkConnWorker, log);
-        }
-        else {
-            U.interrupt(chkStatusSnd);
-            U.join(chkStatusSnd, log);
-        }
+        U.interrupt(chkStatusSnd);
+        U.join(chkStatusSnd, log);
 
         U.interrupt(ipFinderCleaner);
         U.join(ipFinderCleaner, log);
@@ -1403,7 +1388,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
 
             if (spi.failureDetectionThresholdEnabled())
-                b.append("    Check connectino worker: ").append(threadStatus(chkConnWorker)).append(U.nl());
+                b.append("    Check connection worker: ").append(threadStatus(chkStatusSnd)).append(U.nl());
             else
                 b.append("    Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
 
@@ -1540,12 +1525,39 @@ class ServerImpl extends TcpDiscoveryImpl {
      * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
      */
     private class CheckStatusSender extends IgniteSpiThread {
+        /** Heartbeats check timeout. */
+        protected long hbCheckTimeout;
+
+        /** Time when the last status message has been sent. */
+        protected long lastTimeStatusMsgSent;
+
         /**
          * Constructor.
          */
         private CheckStatusSender() {
             super(spi.ignite().name(), "tcp-disco-status-check-sender", log);
 
+            init();
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param threadName Name of the thread.
+         */
+        private CheckStatusSender(String threadName) {
+            super(spi.ignite().name(), threadName, log);
+
+            init();
+        }
+
+        /**
+         * Inits {@code CheckStatusSender}.
+         */
+        private void init() {
+            // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
+            hbCheckTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
+
             setPriority(spi.threadPri);
         }
 
@@ -1555,22 +1567,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (log.isDebugEnabled())
                 log.debug("Status check sender has been started.");
 
-            // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
-            long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
-
-            long lastSent = 0;
-
             while (!isInterrupted()) {
-                // 1. Determine timeout.
-                if (lastSent < locNode.lastUpdateTime())
-                    lastSent = locNode.lastUpdateTime();
-
-                long timeout = (lastSent + checkTimeout) - U.currentTimeMillis();
-
-                if (timeout > 0)
-                    Thread.sleep(timeout);
-
-                // 2. Check if SPI is still connected.
                 if (spiStateCopy() != CONNECTED) {
                     if (log.isDebugEnabled())
                         log.debug("Stopping status check sender (SPI is not connected to topology).");
@@ -1578,41 +1575,73 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
-                // 3. Was there an update?
-                if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping status check send " +
-                            "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) +
-                            ", hasRmts=" + ring.hasRemoteNodes() + ']');
+                long hbTimeout = checkHeartbeats();
 
-                    continue;
-                }
-
-                // 4. Send status check message.
-                lastSent = U.currentTimeMillis();
+                assert hbTimeout > 0;
 
-                msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+                if (hbTimeout > 0)
+                    Thread.sleep(hbTimeout);
             }
         }
+
+        /**
+         * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
+         * {@link TcpDiscoveryStatusCheckMessage} is sent accross the ring.
+         *
+         * @return Timeout to wait before calling this function again.
+         */
+        protected long checkHeartbeats() {
+            // 1. Determine timeout.
+            if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
+                lastTimeStatusMsgSent = locNode.lastUpdateTime();
+
+            long timeout = (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis();
+
+            // 2. Still need to wait before sending?
+            if (timeout > 0)
+                return timeout;
+
+            msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+
+            // 3. Send status check message.
+            lastTimeStatusMsgSent = U.currentTimeMillis();
+
+            return (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis();
+        }
+
+        /**
+         * Called when a chunck of data is received from a remote node.
+         */
+        protected void onDataReceived() {
+            // No-op
+        }
+
+        /**
+         * Signals that a message added to the messages queue by this thread has been processed.
+         */
+        protected void messageProcessed() {
+            throw new UnsupportedOperationException();
+        }
     }
 
     /**
      * TODO: IGNITE-752
      */
-    private class CheckConnectionWorker extends IgniteSpiThread {
+    private class CheckConnectionWorker extends CheckStatusSender {
         /** */
         private volatile boolean msgInQueue;
 
         /** */
         private volatile boolean logMsgPrinted;
 
+        /** Time when the last status message has been sent. */
+        private long lastTimeConnCheckMsgSent;
+
         /**
          * Constructor
          */
         public CheckConnectionWorker() {
-            super(spi.ignite().name(), "tcp-disco-conn-check-worker", log);
-
-            setPriority(spi.threadPri);
+            super("tcp-disco-conn-check-worker");
         }
 
         /** {@inheritDoc} */
@@ -1628,46 +1657,63 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
-                if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() &&
-                    ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+                long hbTimeout = checkHeartbeats();
 
-                    if (!logMsgPrinted) {
-                        log.info("Local node seems to be disconnected from topology (failure detection threshold " +
-                            "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
-                            ", connCheckFreq=" + spi.connCheckFreq + ']');
+                assert hbTimeout > 0;
 
-                        logMsgPrinted = true;
-                    }
-                }
+                long connTimeout = checkConnection();
 
-                if (msgInQueue) {
-                    Thread.sleep(spi.connCheckFreq);
+                assert connTimeout > 0;
 
-                    continue;
-                }
+                Thread.sleep(Math.min(hbTimeout, connTimeout));
+            }
+        }
 
-                if (ring.hasRemoteNodes()) {
-                    // Send the message using ring message worker in order to reuse an existed socket to the next node.
-                    msgInQueue = true;
+        /**
+         * TODO: IGNITE-752
+         * @return
+         */
+        private long checkConnection() {
+            if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() &&
+                ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+
+                if (!logMsgPrinted) {
+                    log.info("Local node seems to be disconnected from topology (failure detection threshold " +
+                        "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
+                        ", connCheckFreq=" + spi.connCheckFreq + ']');
 
-                    msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode));
+                    logMsgPrinted = true;
                 }
+            }
+
+            if (msgInQueue)
+                return spi.connCheckFreq;
+
+            long timeout = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis();
 
-                Thread.sleep(spi.connCheckFreq);
+            if (timeout > 0)
+                return timeout;
+
+            if (ring.hasRemoteNodes()) {
+                // Send the message using ring message worker in order to reuse an existed socket to the next node.
+                msgInQueue = true;
+
+                msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode));
+
+                lastTimeConnCheckMsgSent = U.currentTimeMillis();
             }
+
+            return spi.connCheckFreq;
         }
 
-        /**
-         * TODO: IGNITE-752
-         */
-        private void reset() {
+
+        /** {@inheritDoc} */
+        @Override protected void onDataReceived() {
             logMsgPrinted = false;
         }
 
-        /**
-         * TODO: IGNITE-752
-         */
-        private void messageProcessed() {
+        /** {@inheritDoc} */
+        protected void messageProcessed() {
             msgInQueue = false;
         }
     }
@@ -2129,6 +2175,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (ring.hasRemoteNodes()) {
                             msg.senderNodeId(locNodeId);
 
+                            if (msg instanceof TcpDiscoveryConnectionCheckMessage ||
+                                (msg instanceof TcpDiscoveryStatusCheckMessage &&
+                                ((TcpDiscoveryStatusCheckMessage)msg).replacedConnCheckMsg()))
+                                break;
+
                             addMessage(msg);
                         }
 
@@ -2380,16 +2431,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 }
                             }
 
-                            if (msg instanceof TcpDiscoveryConnectionCheckMessage && next.version().greaterThanEqual(
-                                    TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
-                                    TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER,
-                                    TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) {
-                                // Preserve backward compatibility with nodes of older versions.
-                                assert msg.creatorNodeId().equals(getLocalNodeId());
-
-                                msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
-                            }
-
                             prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
 
                             try {
@@ -3973,16 +4014,37 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (spiStateCopy() != CONNECTED) {
                 if (log.isDebugEnabled())
-                    log.debug("Connection check message discarded (local node receives updates).");
+                    log.debug("Connection check message discarded (local node is leaving topology).");
 
-                chkConnWorker.messageProcessed();
+                chkStatusSnd.messageProcessed();
                 return;
             }
 
-            if (ring.hasRemoteNodes())
-                sendMessageAcrossRing(msg);
+            if (next == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Connection check message discarded (no next node in topology).");
+
+                chkStatusSnd.messageProcessed();
+                return;
+            }
 
-            chkConnWorker.messageProcessed();
+            try {
+                // Link to the 'next' node is updated only inside RingMessageWorker thread, no need to check on 'null'.
+                if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+                    TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER,
+                    TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) {
+                    // Preserve backward compatibility with nodes of older versions.
+                    TcpDiscoveryStatusCheckMessage stMsg = new TcpDiscoveryStatusCheckMessage(locNode, null);
+                    stMsg.replacedConnCheckMsg(true);
+
+                    processStatusCheckMessage(stMsg);
+                }
+                else if (ring.hasRemoteNodes())
+                    sendMessageAcrossRing(msg);
+            }
+            finally {
+                chkStatusSnd.messageProcessed();
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index c1cf9ab..20d49df 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -133,7 +133,7 @@ abstract class TcpDiscoveryImpl {
     /**
      * TODO: IGNITE-752
      */
-    protected void onDataRead() {
+    protected void onDataReceived() {
         // No-op
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index fcba8c6..f231c29 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1362,7 +1362,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
             T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
 
-            impl.onDataRead();
+            impl.onDataReceived();
 
             return res;
         }
@@ -1405,7 +1405,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             if (res == -1)
                 throw new EOFException();
 
-            impl.onDataRead();
+            impl.onDataReceived();
 
             return res;
         }
@@ -1655,6 +1655,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
             assertParameter(reconCnt > 0, "reconnectCnt > 0");
         }
+        else
+            assertParameter(connCheckFreq < failureDetectionThreshold(), "failureDetectionThreshold > connCheckFreq");
 
         assertParameter(ipFinder != null, "ipFinder != null");
         assertParameter(hbFreq > 0, "heartbeatFreq > 0");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
index bec7093..b2c2e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
@@ -49,6 +49,9 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage
     /** Creator node status (initialized by coordinator). */
     private int status;
 
+    /** Whether this message replaced {@link TcpDiscoveryConnectionCheckMessage} to preserve backward compatibility. */
+    private transient boolean replacedConnCheckMsg;
+
     /**
      * Constructor.
      *
@@ -98,6 +101,22 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage
         this.status = status;
     }
 
+    /**
+     * TODO: IGNITE-752
+     * @return
+     */
+    public boolean replacedConnCheckMsg() {
+        return replacedConnCheckMsg;
+    }
+
+    /**
+     * TODO: IGNITE-752
+     * @param replacedConnCheckMsg
+     */
+    public void replacedConnCheckMsg(boolean replacedConnCheckMsg) {
+        this.replacedConnCheckMsg = replacedConnCheckMsg;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryStatusCheckMessage.class, this, "super", super.toString());