You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/21 16:35:36 UTC
incubator-ignite git commit: ignite-752: removed heartbeats sender
and status checker threads
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-752 9319206bd -> c453ab8dc
ignite-752: removed heartbeats sender and status checker threads
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c453ab8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c453ab8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c453ab8d
Branch: refs/heads/ignite-752
Commit: c453ab8dc19327ac8b7df8350bd65ed0e237c3ac
Parents: 9319206
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Jul 21 17:35:23 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Jul 21 17:35:23 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 423 +++++--------------
...TcpDiscoverySpiFailureThresholdSelfTest.java | 5 +-
2 files changed, 114 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c453ab8d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 02e13ba..2a09c62 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -80,14 +80,6 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Client message workers. */
protected ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
- /** Metrics sender. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private HeartbeatsSender hbsSnd;
-
- /** Status checker. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private CheckStatusSender chkStatusSnd;
-
/** IP finder cleaner. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private IpFinderCleaner ipFinderCleaner;
@@ -229,12 +221,6 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.stats.onJoinFinished();
- hbsSnd = new HeartbeatsSender();
- hbsSnd.start();
-
- chkStatusSnd = spi.failureDetectionThresholdEnabled() ? new CheckConnectionWorker() : new CheckStatusSender();
- chkStatusSnd.start();
-
if (spi.ipFinder.isShared()) {
ipFinderCleaner = new IpFinderCleaner();
ipFinderCleaner.start();
@@ -319,12 +305,6 @@ class ServerImpl extends TcpDiscoveryImpl {
U.interrupt(tmp);
U.joinThreads(tmp, log);
- U.interrupt(hbsSnd);
- U.join(hbsSnd, log);
-
- U.interrupt(chkStatusSnd);
- U.join(chkStatusSnd, log);
-
U.interrupt(ipFinderCleaner);
U.join(ipFinderCleaner, log);
@@ -624,8 +604,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (locNode != null)
locNode.lastDataReceivedTime(U.currentTimeMillis());
- if (chkStatusSnd != null)
- chkStatusSnd.onDataReceived();
+ if (msgWorker != null)
+ // Node receives messages from remote nodes, reset this flag.
+ msgWorker.failureDetectionNotified = false;
}
}
@@ -1291,12 +1272,6 @@ class ServerImpl extends TcpDiscoveryImpl {
U.interrupt(tcpSrvr);
U.join(tcpSrvr, log);
- U.interrupt(hbsSnd);
- U.join(hbsSnd, log);
-
- U.interrupt(chkStatusSnd);
- U.join(chkStatusSnd, log);
-
U.interrupt(ipFinderCleaner);
U.join(ipFinderCleaner, log);
@@ -1386,12 +1361,6 @@ class ServerImpl extends TcpDiscoveryImpl {
b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
- if (spi.failureDetectionThresholdEnabled())
- b.append(" Check connection worker: ").append(threadStatus(chkStatusSnd)).append(U.nl());
- else
- b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
-
- b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
@@ -1475,261 +1444,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Thread that sends heartbeats.
- */
- private class HeartbeatsSender extends IgniteSpiThread {
- /**
- * Constructor.
- */
- private HeartbeatsSender() {
- super(spi.ignite().name(), "tcp-disco-hb-sender", log);
-
- setPriority(spi.threadPri);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("BusyWait")
- @Override protected void body() throws InterruptedException {
- while (!isLocalNodeCoordinator())
- Thread.sleep(1000);
-
- if (log.isDebugEnabled())
- log.debug("Heartbeats sender has been started.");
-
- UUID nodeId = getConfiguredNodeId();
-
- while (!isInterrupted()) {
- if (spiStateCopy() != CONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Stopping heartbeats sender (SPI is not connected to topology).");
-
- return;
- }
-
- TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId);
-
- msg.verify(getLocalNodeId());
-
- msgWorker.addMessage(msg);
-
- Thread.sleep(spi.hbFreq);
- }
- }
- }
-
- /**
- * Thread that sends status check messages to next node if local node has not
- * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
- * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
- * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
- */
- private class CheckStatusSender extends IgniteSpiThread {
- /** Heartbeats check timeout. */
- protected long hbCheckTimeout;
-
- /** Time when the last status message has been sent. */
- protected long lastTimeStatusMsgSent;
-
- /**
- * Constructor.
- */
- private CheckStatusSender() {
- super(spi.ignite().name(), "tcp-disco-status-check-sender", log);
-
- init();
- }
-
- /**
- * Constructor.
- *
- * @param threadName Name of the thread.
- */
- private CheckStatusSender(String threadName) {
- super(spi.ignite().name(), threadName, log);
-
- init();
- }
-
- /**
- * Inits {@code CheckStatusSender}.
- */
- private void init() {
- // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
- hbCheckTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
-
- setPriority(spi.threadPri);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("BusyWait")
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Status check sender has been started.");
-
- while (!isInterrupted()) {
- if (spiStateCopy() != CONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Stopping status check sender (SPI is not connected to topology).");
-
- return;
- }
-
- long hbTimeout = checkHeartbeats();
-
- assert hbTimeout > 0;
-
- if (hbTimeout > 0)
- Thread.sleep(hbTimeout);
- }
- }
-
- /**
- * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
- * {@link TcpDiscoveryStatusCheckMessage} is sent accross the ring.
- *
- * @return Timeout to wait before calling this function again.
- */
- protected long checkHeartbeats() {
- // 1. Determine timeout.
- if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
- lastTimeStatusMsgSent = locNode.lastUpdateTime();
-
- long timeout = (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis();
-
- // 2. Still need to wait before sending?
- if (timeout > 0)
- return timeout;
-
- msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
-
- // 3. Send status check message.
- lastTimeStatusMsgSent = U.currentTimeMillis();
-
- return (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis();
- }
-
- /**
- * Called when a chunck of data is received from a remote node.
- */
- protected void onDataReceived() {
- // No-op
- }
-
- /**
- * Signals that a message added to the messages queue by this thread has been processed.
- */
- protected void messageProcessed() {
- throw new UnsupportedOperationException();
- }
- }
-
- /**
- * Thread performs the following two tasks:
- * <ul>
- * <li>
- * Send connection check message to the next node with {@link TcpDiscoverySpi#connCheckFreq} frequency;
- * </li>
- * <li>
- * Sends status check messages to next node if local node has not
- * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
- * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
- * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
- * </li>
- * </ul>
- */
- private class CheckConnectionWorker extends CheckStatusSender {
- /** */
- private volatile boolean msgInQueue;
-
- /** */
- private volatile boolean logMsgPrinted;
-
- /** Time when the last status message has been sent. */
- private long lastTimeConnCheckMsgSent;
-
- /**
- * Constructor
- */
- public CheckConnectionWorker() {
- super("tcp-disco-conn-check-worker");
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Connection check worker has been started.");
-
- while (!isInterrupted()) {
- if (spiStateCopy() != CONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Stopping connection check worker (SPI is not connected to topology).");
-
- return;
- }
-
- long hbTimeout = checkHeartbeats();
-
- assert hbTimeout > 0;
-
- long connTimeout = checkConnection();
-
- assert connTimeout > 0;
-
- Thread.sleep(Math.min(hbTimeout, connTimeout));
- }
- }
-
- /**
- * Check connection aliveness status.
- *
- * @return Timeout to wait before caliing this method the next time.
- */
- private long checkConnection() {
- if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() &&
- ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
-
- if (!logMsgPrinted) {
- log.info("Local node seems to be disconnected from topology (failure detection threshold " +
- "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
- ", connCheckFreq=" + spi.connCheckFreq + ']');
-
- logMsgPrinted = true;
- }
- }
-
- if (msgInQueue)
- return spi.connCheckFreq;
-
- long timeout = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis();
-
- if (timeout > 0)
- return timeout;
-
- if (ring.hasRemoteNodes()) {
- // Send the message using ring message worker in order to reuse an existed socket to the next node.
- msgInQueue = true;
-
- msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode));
-
- lastTimeConnCheckMsgSent = U.currentTimeMillis();
- }
-
- return spi.connCheckFreq;
- }
-
-
- /** {@inheritDoc} */
- @Override protected void onDataReceived() {
- logMsgPrinted = false;
- }
-
- /** {@inheritDoc} */
- @Override protected void messageProcessed() {
- msgInQueue = false;
- }
- }
-
- /**
* Thread that cleans IP finder and keeps it in the correct state, unregistering
* addresses of the nodes that has left the topology.
* <p>
@@ -2051,6 +1765,21 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Socket. */
private Socket sock;
+ /** Last time status message has been sent. */
+ private long lastTimeStatusMsgSent;
+
+ /** Incoming heartbeats check frequency. */
+ private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50;
+
+ /** Time when the last status message has been sent. */
+ private long lastTimeConnCheckMsgSent;
+
+ /** Whether an error message has been printed out when failure detection threshold is reached. */
+ private volatile boolean failureDetectionNotified;
+
+ /** Last time hearbeat message has been sent. */
+ private long lastTimeHbMsgSent;
+
/**
*/
protected RingMessageWorker() {
@@ -2117,6 +1846,15 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.stats.onMessageProcessingFinished(msg);
}
+ /** {@inheritDoc} */
+ @Override protected void noMessageLoop() {
+ checkConnection();
+
+ sendHeartbeatMessage();
+
+ checkHeartbeatsReceiving();
+ }
+
/**
* Sends message across the ring.
*
@@ -4039,7 +3777,6 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Connection check message discarded (local node is leaving topology).");
- chkStatusSnd.messageProcessed();
return;
}
@@ -4047,27 +3784,19 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Connection check message discarded (no next node in topology).");
- chkStatusSnd.messageProcessed();
return;
}
- try {
- // Link to the 'next' node is updated only inside RingMessageWorker thread, no need to check on 'null'.
- if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
- TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER,
- TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) {
- // Preserve backward compatibility with nodes of older versions.
- TcpDiscoveryStatusCheckMessage stMsg = new TcpDiscoveryStatusCheckMessage(locNode, null);
- stMsg.replacedConnCheckMsg(true);
-
- processStatusCheckMessage(stMsg);
- }
- else if (ring.hasRemoteNodes())
- sendMessageAcrossRing(msg);
- }
- finally {
- chkStatusSnd.messageProcessed();
- }
+ // Link to the 'next' node is updated only inside RingMessageWorker thread, no need to check on 'null'.
+ if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+ TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER, TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) {
+ // Preserve backward compatibility with nodes of older versions.
+ TcpDiscoveryStatusCheckMessage stMsg = new TcpDiscoveryStatusCheckMessage(locNode, null);
+ stMsg.replacedConnCheckMsg(true);
+
+ processStatusCheckMessage(stMsg);
+ } else if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
}
/**
@@ -4272,6 +4001,71 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
}
+
+ /**
+ * Sends heartbeat message if needed.
+ */
+ private void sendHeartbeatMessage() {
+ if (!isLocalNodeCoordinator())
+ return;
+
+ long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
+
+ if (elapsed > 0)
+ return;
+
+ TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
+
+ msg.verify(getLocalNodeId());
+
+ msgWorker.addMessage(msg);
+
+ lastTimeHbMsgSent = U.currentTimeMillis();
+ }
+
+ /**
+ * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
+ * {@link TcpDiscoveryStatusCheckMessage} is sent accros the ring.
+ */
+ private void checkHeartbeatsReceiving() {
+ if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
+ lastTimeStatusMsgSent = locNode.lastUpdateTime();
+
+ long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+
+ if (elapsed > 0)
+ return;
+
+ msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+
+ lastTimeStatusMsgSent = U.currentTimeMillis();
+ }
+
+ /**
+ * Check connection aliveness status.
+ */
+ private void checkConnection() {
+ if (!failureDetectionNotified && U.currentTimeMillis() - locNode.lastDataReceivedTime()
+ >= spi.failureDetectionThreshold() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+
+ log.info("Local node seems to be disconnected from topology (failure detection threshold " +
+ "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
+ ", connCheckFreq=" + spi.connCheckFreq + ']');
+
+ failureDetectionNotified = true;
+ }
+
+ long elapsed = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis();
+
+ if (elapsed > 0)
+ return;
+
+ if (ring.hasRemoteNodes()) {
+ processConnectionCheckMessage(new TcpDiscoveryConnectionCheckMessage(locNode));
+
+ lastTimeConnCheckMsgSent = U.currentTimeMillis();
+ }
+ }
}
/**
@@ -5219,10 +5013,12 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']');
while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
+ TcpDiscoveryAbstractMessage msg = queue.poll(10, TimeUnit.MILLISECONDS);
- if (msg == null)
+ if (msg == null) {
+ noMessageLoop();
continue;
+ }
processMessage(msg);
}
@@ -5268,6 +5064,13 @@ class ServerImpl extends TcpDiscoveryImpl {
protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
/**
+ * Called when there is no message to process giving ability to perform other activity.
+ */
+ protected void noMessageLoop() {
+ // No-op.
+ }
+
+ /**
* @param sock Socket.
* @param msg Message.
* @param timeout Socket timeout.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c453ab8d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
index 4fdf886..362be15 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
@@ -37,7 +37,7 @@ import java.net.*;
*/
public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySelfTest {
/** */
- private static final int SPI_COUNT = 7;
+ private static final int SPI_COUNT = 6;
/** */
private static final long CONN_CHECK_FREQ = 2000;
@@ -76,9 +76,6 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
case 5:
spi.setMaxAckTimeout(10000);
break;
- case 6:
- spi.setNetworkTimeout(4000);
- break;
default:
assert false;
}