You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/23 09:40:45 UTC
[17/18] incubator-ignite git commit: ignite-752: reduced size of
connection check message, automatic connection check frequency calculation
ignite-752: reduced size of connection check message, automatic connection check frequency calculation
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a0edbbc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a0edbbc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a0edbbc0
Branch: refs/heads/ignite-752
Commit: a0edbbc0a800748633cd3f92d4537a04e8bbc2ba
Parents: 27b426b
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 23 10:36:57 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 23 10:36:57 2015 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 44 +++++++++++++++-----
.../spi/discovery/tcp/TcpDiscoverySpi.java | 31 --------------
.../TcpDiscoveryConnectionCheckMessage.java | 21 +++++++++-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 2 +-
...entDiscoverySpiFailureThresholdSelfTest.java | 38 ++++++++++++++++-
.../tcp/TcpDiscoverySpiConfigSelfTest.java | 3 --
...TcpDiscoverySpiFailureThresholdSelfTest.java | 9 +---
8 files changed, 94 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 5ed5a6c..6fc5893 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -191,7 +191,7 @@ public class IgniteConfiguration {
public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
/** Default failure detection threshold used by DiscoverySpi and CommunicationSpi in millis. */
- public static final long DFLT_FAILURE_DETECTION_THRESHOLD = 12_000;
+ public static final long DFLT_FAILURE_DETECTION_THRESHOLD = 10_000;
/** Optional grid name. */
private String gridName;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 56472aa..b085b3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1774,19 +1774,43 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Incoming heartbeats check frequency. */
private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50;
+ /** Last time heartbeat message has been sent. */
+ private long lastTimeHbMsgSent;
+
/** Time when the last status message has been sent. */
private long lastTimeConnCheckMsgSent;
/** Flag that keeps info on whether the threshold is reached or not. */
private boolean failureThresholdReached;
- /** Last time hearbeat message has been sent. */
- private long lastTimeHbMsgSent;
+ /** Connection check frequency. */
+ private long connCheckFreq;
/**
*/
protected RingMessageWorker() {
super("tcp-disco-msg-worker", 10);
+
+ initConnectionCheckFrequency();
+ }
+
+ /**
+ * Initializes connection check frequency. Used only when failure detection threshold is enabled.
+ */
+ private void initConnectionCheckFrequency() {
+ if (spi.failureDetectionThresholdEnabled()) {
+ for (int i = 3; i > 0; i--) {
+ connCheckFreq = spi.failureDetectionThreshold() / i;
+
+ if (connCheckFreq > 0)
+ break;
+ }
+
+ assert connCheckFreq > 0;
+
+ if (log.isDebugEnabled())
+ log.debug("Connection check frequency is calculated: " + connCheckFreq);
+ }
}
/**
@@ -4054,12 +4078,12 @@ class ServerImpl extends TcpDiscoveryImpl {
log.info("Local node seems to be disconnected from topology (failure detection threshold " +
"is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
- ", connCheckFreq=" + spi.connCheckFreq + ']');
+ ", connCheckFreq=" + connCheckFreq + ']');
failureThresholdReached = true;
}
- long elapsed = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis();
+ long elapsed = (lastTimeConnCheckMsgSent + connCheckFreq) - U.currentTimeMillis();
if (elapsed > 0)
return;
@@ -4423,7 +4447,12 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode && recordable(msg))
debugLog("Message has been received: " + msg);
- if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+ if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+
+ continue;
+ }
+ else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
if (!req.responded()) {
@@ -4436,11 +4465,6 @@ class ServerImpl extends TcpDiscoveryImpl {
break;
}
}
- else if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
- spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
-
- continue;
- }
else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
if (clientMsgWrk != null) {
TcpDiscoverySpiState state = spiStateCopy();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index d754dab..be042eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -217,9 +217,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */
public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5;
- /** Default connection check frequency. */
- public static final int DFLT_CONN_CHECK_FREQ = 2000;
-
/** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */
public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000;
@@ -260,9 +257,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Size of topology snapshots history. */
protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
- /** Connection check frequency. Used in conjunction with failure detection threshold. */
- protected long connCheckFreq = DFLT_CONN_CHECK_FREQ;
-
/** Grid discovery listener. */
protected volatile DiscoverySpiListener lsnr;
@@ -855,29 +849,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/**
- * Sets connection check frequency. Used in conjunction with {@link IgniteConfiguration#failureDetectionThreshold}.
- * <p>
- * A node sends connection check messages to its next node in the topology with this frequency to check its
- * connection status and quickly process a network related error if any.
- * <p>
- * The way to check connection aliveness with connection check messages is much cheaper than to use heartbeat
- * messages. The reason is that a connection check message is only processed by the next node in a topology,
- * while a heartbeat message is translated twice across the ring.
- * <p>
- * Affects server nodes only.
- *
- * @param connCheckFreq Frequency in milliseconds.
- * @return Tcp discovery spi.
- * @see IgniteConfiguration#setFailureDetectionThreshold(long)
- */
- @IgniteSpiConfiguration(optional = true)
- public TcpDiscoverySpi setConnectionCheckFrequency(long connCheckFreq) {
- this.connCheckFreq = connCheckFreq;
-
- return this;
- }
-
- /**
* @return Size of topology snapshots history.
*/
public long getTopHistorySize() {
@@ -1674,8 +1645,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
assertParameter(reconCnt > 0, "reconnectCnt > 0");
}
- else
- assertParameter(connCheckFreq < failureDetectionThreshold(), "failureDetectionThreshold > connCheckFreq");
assertParameter(netTimeout > 0, "networkTimeout > 0");
assertParameter(ipFinder != null, "ipFinder != null");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
index 046e2b5..9c8d7cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
@@ -20,16 +20,25 @@ package org.apache.ignite.spi.discovery.tcp.messages;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.spi.discovery.tcp.internal.*;
+import java.io.*;
+
/**
* Message used to check whether a node is still connected to the topology.
* The difference from {@link TcpDiscoveryStatusCheckMessage} is that this message is sent to the next node
* which directly replies to the sender without message re-translation to the coordinator.
*/
-public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage {
+public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage implements Externalizable {
/** */
private static final long serialVersionUID = 0L;
/**
+ * Default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryConnectionCheckMessage() {
+ // No-op.
+ }
+
+ /**
* Constructor.
*
* @param creatorNode Node created this message.
@@ -42,4 +51,14 @@ public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMess
@Override public String toString() {
return S.toString(TcpDiscoveryConnectionCheckMessage.class, this, "super", super.toString());
}
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ // No-op
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index ccb9717..514f784 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -166,7 +166,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
@Override public boolean apply() {
return recoveryDesc.messagesFutures().isEmpty();
}
- }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() + 5000 :
+ }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() + 7000 :
10_000);
assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
index 939286d..4c7dbe8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
@@ -34,6 +34,12 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
private final static int FAILURE_AWAIT_TIME = 7_000;
/** */
+ private final static long FAILURE_THRESHOLD = 10_000;
+
+ /** */
+ private static long failureThreshold = FAILURE_THRESHOLD;
+
+ /** */
private static boolean useTestSpi;
/** {@inheritDoc} */
@@ -43,7 +49,7 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
/** {@inheritDoc} */
@Override protected long failureDetectionThreshold() {
- return useTestSpi ? 5000 : 10_000;
+ return failureThreshold;
}
/** {@inheritDoc} */
@@ -86,7 +92,35 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
/**
* @throws Exception in case of error.
*/
- public void testFailureThresholdWorkability() throws Exception {
+ public void testFailureThresholdWorkabilityAvgTimeout() throws Exception {
+ failureThreshold = 3000;
+
+ try {
+ checkFailureThresholdWorkability();
+ }
+ finally {
+ failureThreshold = FAILURE_THRESHOLD;
+ }
+ }
+
+ /**
+ * @throws Exception in case of error.
+ */
+ public void testFailureThresholdWorkabilitySmallTimeout() throws Exception {
+ failureThreshold = 500;
+
+ try {
+ checkFailureThresholdWorkability();
+ }
+ finally {
+ failureThreshold = FAILURE_THRESHOLD;
+ }
+ }
+
+ /**
+ * @throws Exception in case of error.
+ */
+ private void checkFailureThresholdWorkability() throws Exception {
useTestSpi = true;
TestTcpDiscoverySpi firstSpi = null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
index 91f4f9e..8ab2116 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
@@ -42,8 +42,5 @@ public class TcpDiscoverySpiConfigSelfTest extends GridSpiAbstractConfigTest<Tcp
checkNegativeSpiProperty(new TcpDiscoverySpi(), "threadPriority", -1);
checkNegativeSpiProperty(new TcpDiscoverySpi(), "maxMissedHeartbeats", 0);
checkNegativeSpiProperty(new TcpDiscoverySpi(), "statisticsPrintFrequency", 0);
- checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency", 0);
- checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency",
- IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD + 1000);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
index 1ee839c..63e79c3 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
@@ -40,9 +40,6 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
private static final int SPI_COUNT = 6;
/** */
- private static final long CONN_CHECK_FREQ = 2000;
-
- /** */
private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** {@inheritDoc} */
@@ -59,8 +56,6 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
switch (idx) {
case 0:
- spi.setConnectionCheckFrequency(CONN_CHECK_FREQ);
- break;
case 1:
// Ignore
break;
@@ -180,7 +175,7 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
firstSpi().countConnCheckMsg = true;
nextSpi.countConnCheckMsg = true;
- Thread.sleep(CONN_CHECK_FREQ * 5);
+ Thread.sleep(firstSpi().failureDetectionThreshold());
firstSpi().countConnCheckMsg = false;
nextSpi.countConnCheckMsg = false;
@@ -237,7 +232,7 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
firstSpi().countConnCheckMsg = true;
nextSpi.countConnCheckMsg = true;
- Thread.sleep(CONN_CHECK_FREQ * 5);
+ Thread.sleep(firstSpi().failureDetectionThreshold() / 2);
firstSpi().countConnCheckMsg = false;
nextSpi.countConnCheckMsg = false;