You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/19 12:33:20 UTC
[6/9] incubator-ignite git commit: ignite-752: optimized connection
check message impl
ignite-752: optimized connection check message impl
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0cc31b27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0cc31b27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0cc31b27
Branch: refs/heads/ignite-752
Commit: 0cc31b27d449e18251e1d681a401f21365f8e529
Parents: 9878f40
Author: Denis Magda <dm...@gridgain.com>
Authored: Sun Jul 19 09:54:13 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Sun Jul 19 09:54:13 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 248 ++++++++++++-------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 2 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 6 +-
.../TcpDiscoveryStatusCheckMessage.java | 19 ++
4 files changed, 179 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9e9921b..f05d027 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -88,9 +88,6 @@ class ServerImpl extends TcpDiscoveryImpl {
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private CheckStatusSender chkStatusSnd;
- /** Connection checker. */
- private CheckConnectionWorker chkConnWorker;
-
/** IP finder cleaner. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private IpFinderCleaner ipFinderCleaner;
@@ -235,10 +232,7 @@ class ServerImpl extends TcpDiscoveryImpl {
hbsSnd = new HeartbeatsSender();
hbsSnd.start();
- chkConnWorker = new CheckConnectionWorker();
- chkConnWorker.start();
-
- chkStatusSnd = new CheckStatusSender();
+ chkStatusSnd = spi.failureDetectionThresholdEnabled() ? new CheckConnectionWorker() : new CheckStatusSender();
chkStatusSnd.start();
if (spi.ipFinder.isShared()) {
@@ -329,9 +323,6 @@ class ServerImpl extends TcpDiscoveryImpl {
U.interrupt(hbsSnd);
U.join(hbsSnd, log);
- U.interrupt(chkConnWorker);
- U.join(chkConnWorker, log);
-
U.interrupt(chkStatusSnd);
U.join(chkStatusSnd, log);
@@ -629,10 +620,10 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override protected void onDataRead() {
+ @Override protected void onDataReceived() {
if (spi.failureDetectionThresholdEnabled()) {
locNode.lastDataReceivedTime(U.currentTimeMillis());
- chkConnWorker.reset();
+ chkStatusSnd.onDataReceived();
}
}
@@ -1304,14 +1295,8 @@ class ServerImpl extends TcpDiscoveryImpl {
U.interrupt(hbsSnd);
U.join(hbsSnd, log);
- if (spi.failureDetectionThresholdEnabled()) {
- U.interrupt(chkConnWorker);
- U.join(chkConnWorker, log);
- }
- else {
- U.interrupt(chkStatusSnd);
- U.join(chkStatusSnd, log);
- }
+ U.interrupt(chkStatusSnd);
+ U.join(chkStatusSnd, log);
U.interrupt(ipFinderCleaner);
U.join(ipFinderCleaner, log);
@@ -1403,7 +1388,7 @@ class ServerImpl extends TcpDiscoveryImpl {
b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
if (spi.failureDetectionThresholdEnabled())
- b.append(" Check connectino worker: ").append(threadStatus(chkConnWorker)).append(U.nl());
+ b.append(" Check connection worker: ").append(threadStatus(chkStatusSnd)).append(U.nl());
else
b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
@@ -1540,12 +1525,39 @@ class ServerImpl extends TcpDiscoveryImpl {
* {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
*/
private class CheckStatusSender extends IgniteSpiThread {
+ /** Heartbeats check timeout. */
+ protected long hbCheckTimeout;
+
+ /** Time when the last status message has been sent. */
+ protected long lastTimeStatusMsgSent;
+
/**
* Constructor.
*/
private CheckStatusSender() {
super(spi.ignite().name(), "tcp-disco-status-check-sender", log);
+ init();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param threadName Name of the thread.
+ */
+ private CheckStatusSender(String threadName) {
+ super(spi.ignite().name(), threadName, log);
+
+ init();
+ }
+
+ /**
+ * Inits {@code CheckStatusSender}.
+ */
+ private void init() {
+ // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
+ hbCheckTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
+
setPriority(spi.threadPri);
}
@@ -1555,22 +1567,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Status check sender has been started.");
- // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
- long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
-
- long lastSent = 0;
-
while (!isInterrupted()) {
- // 1. Determine timeout.
- if (lastSent < locNode.lastUpdateTime())
- lastSent = locNode.lastUpdateTime();
-
- long timeout = (lastSent + checkTimeout) - U.currentTimeMillis();
-
- if (timeout > 0)
- Thread.sleep(timeout);
-
- // 2. Check if SPI is still connected.
if (spiStateCopy() != CONNECTED) {
if (log.isDebugEnabled())
log.debug("Stopping status check sender (SPI is not connected to topology).");
@@ -1578,41 +1575,73 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- // 3. Was there an update?
- if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) {
- if (log.isDebugEnabled())
- log.debug("Skipping status check send " +
- "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) +
- ", hasRmts=" + ring.hasRemoteNodes() + ']');
+ long hbTimeout = checkHeartbeats();
- continue;
- }
-
- // 4. Send status check message.
- lastSent = U.currentTimeMillis();
+ assert hbTimeout > 0;
- msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+ if (hbTimeout > 0)
+ Thread.sleep(hbTimeout);
}
}
+
+ /**
+ * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
+ * {@link TcpDiscoveryStatusCheckMessage} is sent accross the ring.
+ *
+ * @return Timeout to wait before calling this function again.
+ */
+ protected long checkHeartbeats() {
+ // 1. Determine timeout.
+ if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
+ lastTimeStatusMsgSent = locNode.lastUpdateTime();
+
+ long timeout = (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis();
+
+ // 2. Still need to wait before sending?
+ if (timeout > 0)
+ return timeout;
+
+ msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+
+ // 3. Send status check message.
+ lastTimeStatusMsgSent = U.currentTimeMillis();
+
+ return (lastTimeStatusMsgSent + hbCheckTimeout) - U.currentTimeMillis();
+ }
+
+ /**
+ * Called when a chunck of data is received from a remote node.
+ */
+ protected void onDataReceived() {
+ // No-op
+ }
+
+ /**
+ * Signals that a message added to the messages queue by this thread has been processed.
+ */
+ protected void messageProcessed() {
+ throw new UnsupportedOperationException();
+ }
}
/**
* TODO: IGNITE-752
*/
- private class CheckConnectionWorker extends IgniteSpiThread {
+ private class CheckConnectionWorker extends CheckStatusSender {
/** */
private volatile boolean msgInQueue;
/** */
private volatile boolean logMsgPrinted;
+ /** Time when the last status message has been sent. */
+ private long lastTimeConnCheckMsgSent;
+
/**
* Constructor
*/
public CheckConnectionWorker() {
- super(spi.ignite().name(), "tcp-disco-conn-check-worker", log);
-
- setPriority(spi.threadPri);
+ super("tcp-disco-conn-check-worker");
}
/** {@inheritDoc} */
@@ -1628,46 +1657,63 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() &&
- ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+ long hbTimeout = checkHeartbeats();
- if (!logMsgPrinted) {
- log.info("Local node seems to be disconnected from topology (failure detection threshold " +
- "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
- ", connCheckFreq=" + spi.connCheckFreq + ']');
+ assert hbTimeout > 0;
- logMsgPrinted = true;
- }
- }
+ long connTimeout = checkConnection();
- if (msgInQueue) {
- Thread.sleep(spi.connCheckFreq);
+ assert connTimeout > 0;
- continue;
- }
+ Thread.sleep(Math.min(hbTimeout, connTimeout));
+ }
+ }
- if (ring.hasRemoteNodes()) {
- // Send the message using ring message worker in order to reuse an existed socket to the next node.
- msgInQueue = true;
+ /**
+ * TODO: IGNITE-752
+ * @return
+ */
+ private long checkConnection() {
+ if (U.currentTimeMillis() - locNode.lastDataReceivedTime() >= spi.failureDetectionThreshold() &&
+ ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+
+ if (!logMsgPrinted) {
+ log.info("Local node seems to be disconnected from topology (failure detection threshold " +
+ "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
+ ", connCheckFreq=" + spi.connCheckFreq + ']');
- msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode));
+ logMsgPrinted = true;
}
+ }
+
+ if (msgInQueue)
+ return spi.connCheckFreq;
+
+ long timeout = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis();
- Thread.sleep(spi.connCheckFreq);
+ if (timeout > 0)
+ return timeout;
+
+ if (ring.hasRemoteNodes()) {
+ // Send the message using ring message worker in order to reuse an existed socket to the next node.
+ msgInQueue = true;
+
+ msgWorker.addMessage(new TcpDiscoveryConnectionCheckMessage(locNode));
+
+ lastTimeConnCheckMsgSent = U.currentTimeMillis();
}
+
+ return spi.connCheckFreq;
}
- /**
- * TODO: IGNITE-752
- */
- private void reset() {
+
+ /** {@inheritDoc} */
+ @Override protected void onDataReceived() {
logMsgPrinted = false;
}
- /**
- * TODO: IGNITE-752
- */
- private void messageProcessed() {
+ /** {@inheritDoc} */
+ protected void messageProcessed() {
msgInQueue = false;
}
}
@@ -2129,6 +2175,11 @@ class ServerImpl extends TcpDiscoveryImpl {
if (ring.hasRemoteNodes()) {
msg.senderNodeId(locNodeId);
+ if (msg instanceof TcpDiscoveryConnectionCheckMessage ||
+ (msg instanceof TcpDiscoveryStatusCheckMessage &&
+ ((TcpDiscoveryStatusCheckMessage)msg).replacedConnCheckMsg()))
+ break;
+
addMessage(msg);
}
@@ -2380,16 +2431,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (msg instanceof TcpDiscoveryConnectionCheckMessage && next.version().greaterThanEqual(
- TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
- TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER,
- TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) {
- // Preserve backward compatibility with nodes of older versions.
- assert msg.creatorNodeId().equals(getLocalNodeId());
-
- msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
- }
-
prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
try {
@@ -3973,16 +4014,37 @@ class ServerImpl extends TcpDiscoveryImpl {
if (spiStateCopy() != CONNECTED) {
if (log.isDebugEnabled())
- log.debug("Connection check message discarded (local node receives updates).");
+ log.debug("Connection check message discarded (local node is leaving topology).");
- chkConnWorker.messageProcessed();
+ chkStatusSnd.messageProcessed();
return;
}
- if (ring.hasRemoteNodes())
- sendMessageAcrossRing(msg);
+ if (next == null) {
+ if (log.isDebugEnabled())
+ log.debug("Connection check message discarded (no next node in topology).");
+
+ chkStatusSnd.messageProcessed();
+ return;
+ }
- chkConnWorker.messageProcessed();
+ try {
+ // Link to the 'next' node is updated only inside RingMessageWorker thread, no need to check on 'null'.
+ if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+ TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER,
+ TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) {
+ // Preserve backward compatibility with nodes of older versions.
+ TcpDiscoveryStatusCheckMessage stMsg = new TcpDiscoveryStatusCheckMessage(locNode, null);
+ stMsg.replacedConnCheckMsg(true);
+
+ processStatusCheckMessage(stMsg);
+ }
+ else if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
+ }
+ finally {
+ chkStatusSnd.messageProcessed();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index c1cf9ab..20d49df 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -133,7 +133,7 @@ abstract class TcpDiscoveryImpl {
/**
* TODO: IGNITE-752
*/
- protected void onDataRead() {
+ protected void onDataReceived() {
// No-op
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index fcba8c6..f231c29 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1362,7 +1362,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
- impl.onDataRead();
+ impl.onDataReceived();
return res;
}
@@ -1405,7 +1405,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
if (res == -1)
throw new EOFException();
- impl.onDataRead();
+ impl.onDataReceived();
return res;
}
@@ -1655,6 +1655,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
assertParameter(reconCnt > 0, "reconnectCnt > 0");
}
+ else
+ assertParameter(connCheckFreq < failureDetectionThreshold(), "failureDetectionThreshold > connCheckFreq");
assertParameter(ipFinder != null, "ipFinder != null");
assertParameter(hbFreq > 0, "heartbeatFreq > 0");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cc31b27/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
index bec7093..b2c2e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java
@@ -49,6 +49,9 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage
/** Creator node status (initialized by coordinator). */
private int status;
+ /** Whether this message replaced {@link TcpDiscoveryConnectionCheckMessage} to preserve backward compatibility. */
+ private transient boolean replacedConnCheckMsg;
+
/**
* Constructor.
*
@@ -98,6 +101,22 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage
this.status = status;
}
+ /**
+ * TODO: IGNITE-752
+ * @return
+ */
+ public boolean replacedConnCheckMsg() {
+ return replacedConnCheckMsg;
+ }
+
+ /**
+ * TODO: IGNITE-752
+ * @param replacedConnCheckMsg
+ */
+ public void replacedConnCheckMsg(boolean replacedConnCheckMsg) {
+ this.replacedConnCheckMsg = replacedConnCheckMsg;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryStatusCheckMessage.class, this, "super", super.toString());