You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/23 09:40:45 UTC

[17/18] incubator-ignite git commit: ignite-752: reduced size of connection check message, automatic connection check frequency calculation

ignite-752: reduced size of connection check message, automatic connection check frequency calculation


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a0edbbc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a0edbbc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a0edbbc0

Branch: refs/heads/ignite-752
Commit: a0edbbc0a800748633cd3f92d4537a04e8bbc2ba
Parents: 27b426b
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Jul 23 10:36:57 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Jul 23 10:36:57 2015 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  2 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 44 +++++++++++++++-----
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 31 --------------
 .../TcpDiscoveryConnectionCheckMessage.java     | 21 +++++++++-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |  2 +-
 ...entDiscoverySpiFailureThresholdSelfTest.java | 38 ++++++++++++++++-
 .../tcp/TcpDiscoverySpiConfigSelfTest.java      |  3 --
 ...TcpDiscoverySpiFailureThresholdSelfTest.java |  9 +---
 8 files changed, 94 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 5ed5a6c..6fc5893 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -191,7 +191,7 @@ public class IgniteConfiguration {
     public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
 
     /** Default failure detection threshold used by DiscoverySpi and CommunicationSpi in millis. */
-    public static final long DFLT_FAILURE_DETECTION_THRESHOLD = 12_000;
+    public static final long DFLT_FAILURE_DETECTION_THRESHOLD = 10_000;
 
     /** Optional grid name. */
     private String gridName;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 56472aa..b085b3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1774,19 +1774,43 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Incoming heartbeats check frequency. */
         private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50;
 
+        /** Last time heartbeat message has been sent. */
+        private long lastTimeHbMsgSent;
+
         /** Time when the last status message has been sent. */
         private long lastTimeConnCheckMsgSent;
 
         /** Flag that keeps info on whether the threshold is reached or not. */
         private boolean failureThresholdReached;
 
-        /** Last time hearbeat message has been sent. */
-        private long lastTimeHbMsgSent;
+        /** Connection check frequency. */
+        private long connCheckFreq;
 
         /**
          */
         protected RingMessageWorker() {
             super("tcp-disco-msg-worker", 10);
+
+            initConnectionCheckFrequency();
+        }
+
+        /**
+         * Initializes connection check frequency. Used only when failure detection threshold is enabled.
+         */
+        private void initConnectionCheckFrequency() {
+            if (spi.failureDetectionThresholdEnabled()) {
+                for (int i = 3; i > 0; i--) {
+                    connCheckFreq = spi.failureDetectionThreshold() / i;
+
+                    if (connCheckFreq > 0)
+                        break;
+                }
+
+                assert connCheckFreq > 0;
+
+                if (log.isDebugEnabled())
+                    log.debug("Connection check frequency is calculated: " + connCheckFreq);
+            }
         }
 
         /**
@@ -4054,12 +4078,12 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 log.info("Local node seems to be disconnected from topology (failure detection threshold " +
                              "is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
-                             ", connCheckFreq=" + spi.connCheckFreq + ']');
+                             ", connCheckFreq=" + connCheckFreq + ']');
 
                 failureThresholdReached = true;
             }
 
-            long elapsed = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis();
+            long elapsed = (lastTimeConnCheckMsgSent + connCheckFreq) - U.currentTimeMillis();
 
             if (elapsed > 0)
                 return;
@@ -4423,7 +4447,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (debugMode && recordable(msg))
                             debugLog("Message has been received: " + msg);
 
-                        if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                        if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+
+                            continue;
+                        }
+                        else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
                             TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
 
                             if (!req.responded()) {
@@ -4436,11 +4465,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     break;
                             }
                         }
-                        else if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
-                            spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
-
-                            continue;
-                        }
                         else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
                             if (clientMsgWrk != null) {
                                 TcpDiscoverySpiState state = spiStateCopy();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index d754dab..be042eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -217,9 +217,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */
     public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5;
 
-    /** Default connection check frequency. */
-    public static final int DFLT_CONN_CHECK_FREQ = 2000;
-
     /** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */
     public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000;
 
@@ -260,9 +257,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Size of topology snapshots history. */
     protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
 
-    /** Connection check frequency. Used in conjunction with failure detection threshold. */
-    protected long connCheckFreq = DFLT_CONN_CHECK_FREQ;
-
     /** Grid discovery listener. */
     protected volatile DiscoverySpiListener lsnr;
 
@@ -855,29 +849,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /**
-     * Sets connection check frequency. Used in conjunction with {@link IgniteConfiguration#failureDetectionThreshold}.
-     * <p>
-     * A node sends connection check messages to its next node in the topology with this frequency to check its
-     * connection status and quickly process a network related error if any.
-     * <p>
-     * The way to check connection aliveness with connection check messages is much cheaper than to use heartbeat
-     * messages. The reason is that a connection check message is only processed by the next node in a topology,
-     * while a heartbeat message is translated twice across the ring.
-     * <p>
-     * Affects server nodes only.
-     *
-     * @param connCheckFreq Frequency in milliseconds.
-     * @return Tcp discovery spi.
-     * @see IgniteConfiguration#setFailureDetectionThreshold(long)
-     */
-    @IgniteSpiConfiguration(optional =  true)
-    public TcpDiscoverySpi setConnectionCheckFrequency(long connCheckFreq) {
-        this.connCheckFreq = connCheckFreq;
-
-        return this;
-    }
-
-    /**
      * @return Size of topology snapshots history.
      */
     public long getTopHistorySize() {
@@ -1674,8 +1645,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
             assertParameter(reconCnt > 0, "reconnectCnt > 0");
         }
-        else
-            assertParameter(connCheckFreq < failureDetectionThreshold(), "failureDetectionThreshold > connCheckFreq");
 
         assertParameter(netTimeout > 0, "networkTimeout > 0");
         assertParameter(ipFinder != null, "ipFinder != null");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
index 046e2b5..9c8d7cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
@@ -20,16 +20,25 @@ package org.apache.ignite.spi.discovery.tcp.messages;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.spi.discovery.tcp.internal.*;
 
+import java.io.*;
+
 /**
  * Message used to check whether a node is still connected to the topology.
  * The difference from {@link TcpDiscoveryStatusCheckMessage} is that this message is sent to the next node
  * which directly replies to the sender without message re-translation to the coordinator.
  */
-public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage {
+public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage implements Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
     /**
+     * Default no-arg constructor for {@link Externalizable} interface.
+     */
+    public TcpDiscoveryConnectionCheckMessage() {
+        // No-op.
+    }
+
+    /**
      * Constructor.
      *
      * @param creatorNode Node created this message.
@@ -42,4 +51,14 @@ public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMess
     @Override public String toString() {
         return S.toString(TcpDiscoveryConnectionCheckMessage.class, this, "super", super.toString());
     }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        // No-op
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index ccb9717..514f784 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -166,7 +166,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
                                 @Override public boolean apply() {
                                     return recoveryDesc.messagesFutures().isEmpty();
                                 }
-                            }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() + 5000 :
+                            }, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() + 7000 :
                                 10_000);
 
                             assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
index 939286d..4c7dbe8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
@@ -34,6 +34,12 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
     private final static int FAILURE_AWAIT_TIME = 7_000;
 
     /** */
+    private final static long FAILURE_THRESHOLD = 10_000;
+
+    /** */
+    private static long failureThreshold = FAILURE_THRESHOLD;
+
+    /** */
     private static boolean useTestSpi;
 
     /** {@inheritDoc} */
@@ -43,7 +49,7 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
 
     /** {@inheritDoc} */
     @Override protected long failureDetectionThreshold() {
-        return useTestSpi ? 5000 : 10_000;
+        return failureThreshold;
     }
 
     /** {@inheritDoc} */
@@ -86,7 +92,35 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
     /**
      * @throws Exception in case of error.
      */
-    public void testFailureThresholdWorkability() throws Exception {
+    public void testFailureThresholdWorkabilityAvgTimeout() throws Exception {
+        failureThreshold = 3000;
+
+        try {
+            checkFailureThresholdWorkability();
+        }
+        finally {
+            failureThreshold = FAILURE_THRESHOLD;
+        }
+    }
+
+    /**
+     * @throws Exception in case of error.
+     */
+    public void testFailureThresholdWorkabilitySmallTimeout() throws Exception {
+        failureThreshold = 500;
+
+        try {
+            checkFailureThresholdWorkability();
+        }
+        finally {
+            failureThreshold = FAILURE_THRESHOLD;
+        }
+    }
+
+    /**
+     * @throws Exception in case of error.
+     */
+    private void checkFailureThresholdWorkability() throws Exception {
         useTestSpi = true;
 
         TestTcpDiscoverySpi firstSpi = null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
index 91f4f9e..8ab2116 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
@@ -42,8 +42,5 @@ public class TcpDiscoverySpiConfigSelfTest extends GridSpiAbstractConfigTest<Tcp
         checkNegativeSpiProperty(new TcpDiscoverySpi(), "threadPriority", -1);
         checkNegativeSpiProperty(new TcpDiscoverySpi(), "maxMissedHeartbeats", 0);
         checkNegativeSpiProperty(new TcpDiscoverySpi(), "statisticsPrintFrequency", 0);
-        checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency", 0);
-        checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency",
-            IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD + 1000);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a0edbbc0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
index 1ee839c..63e79c3 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
@@ -40,9 +40,6 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
     private static final int SPI_COUNT = 6;
 
     /** */
-    private static final long CONN_CHECK_FREQ = 2000;
-
-    /** */
     private TcpDiscoveryIpFinder ipFinder =  new TcpDiscoveryVmIpFinder(true);
 
     /** {@inheritDoc} */
@@ -59,8 +56,6 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
 
         switch (idx) {
             case 0:
-                spi.setConnectionCheckFrequency(CONN_CHECK_FREQ);
-                break;
             case 1:
                 // Ignore
                 break;
@@ -180,7 +175,7 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
             firstSpi().countConnCheckMsg = true;
             nextSpi.countConnCheckMsg = true;
 
-            Thread.sleep(CONN_CHECK_FREQ * 5);
+            Thread.sleep(firstSpi().failureDetectionThreshold());
 
             firstSpi().countConnCheckMsg = false;
             nextSpi.countConnCheckMsg = false;
@@ -237,7 +232,7 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
             firstSpi().countConnCheckMsg = true;
             nextSpi.countConnCheckMsg = true;
 
-            Thread.sleep(CONN_CHECK_FREQ * 5);
+            Thread.sleep(firstSpi().failureDetectionThreshold() / 2);
 
             firstSpi().countConnCheckMsg = false;
             nextSpi.countConnCheckMsg = false;