You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/01 15:55:14 UTC

incubator-ignite git commit: ignite-752: implemented

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-752 e91bc484e -> 392274ee5


ignite-752: implemented


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/392274ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/392274ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/392274ee

Branch: refs/heads/ignite-752
Commit: 392274ee5d079a22d1c5c8010d0c101cbc462867
Parents: e91bc48
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Jul 1 16:54:58 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jul 1 16:54:58 2015 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  44 +++++++
 .../communication/tcp/TcpCommunicationSpi.java  |  48 ++++++--
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 119 +++++++++++--------
 3 files changed, 148 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/392274ee/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 2d36c7a..d1d55ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -108,6 +108,12 @@ public class IgniteConfiguration {
     /** Default message send retries count. */
     public static final int DFLT_SEND_RETRY_CNT = 3;
 
+    /** Default communication timeout for network related operations in SPI. Value is <tt>5000ms</tt>. */
+    public static final long DFLT_SPI_COMMUNICATION_TIMEOUT = 5000;
+
+    /** Default communication timeout for network related operations in SPI in client mode. Value is <tt>5000ms</tt>. */
+    public static final long DFLT_SPI_COMMUNICATION_TIMEOUT_CLIENT = 5000;
+
     /** Default number of clock sync samples. */
     public static final int DFLT_CLOCK_SYNC_SAMPLES = 8;
 
@@ -258,6 +264,9 @@ public class IgniteConfiguration {
     /** Maximum network requests timeout. */
     private long netTimeout = DFLT_NETWORK_TIMEOUT;
 
+    /* SPI communication timeout. */
+    private Long spiCommTimeout;
+
     /** Interval between message send retries. */
     private long sndRetryDelay = DFLT_SEND_RETRY_DELAY;
 
@@ -468,6 +477,7 @@ public class IgniteConfiguration {
         metricsUpdateFreq = cfg.getMetricsUpdateFrequency();
         mgmtPoolSize = cfg.getManagementThreadPoolSize();
         netTimeout = cfg.getNetworkTimeout();
+        spiCommTimeout = cfg.getSpiCommunicationTimeout();
         nodeId = cfg.getNodeId();
         p2pEnabled = cfg.isPeerClassLoadingEnabled();
         p2pLocClsPathExcl = cfg.getPeerClassLoadingLocalClassPathExclude();
@@ -1176,6 +1186,40 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Sets maximum timeout for most network related operations of {@link TcpDiscoverySpi} and
+     * {@link TcpCommunicationSpi}.
+     *
+     * In case of {@link TcpDiscoverySpi} this communication timeout is used as default for socket timeout,
+     * network timeout and message acknowledgement timeout. Basing on this timeout default values are calculated
+     * for {@link TcpDiscoverySpi} heartbeat frequency and maximum message acknowledgement timeout.
+     *
+     * In case of {@link TcpCommunicationSpi} this timeout is set as default for socket connection timeout and
+     * used for maximum socket connection timeout calculation.
+     *
+     * @param spiCommTimeout SPI communication timeout.
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setSpiCommunicationTimeout(long spiCommTimeout) {
+        this.spiCommTimeout = spiCommTimeout;
+
+        return this;
+    }
+
+    /**
+     * Gets SPI communication used as default for most network related operations of {@link TcpDiscoverySpi} and
+     * {@link TcpCommunicationSpi}.
+     *
+     * If the timeout is not explicitly set, then either {@link #DFLT_SPI_COMMUNICATION_TIMEOUT} or
+     * {@link #DFLT_SPI_COMMUNICATION_TIMEOUT_CLIENT} is used depending on whether a node is run on server or cliend
+     * mode.
+     *
+     * @return Timeout value or {@code null} if the value has not been set explicitly.
+     */
+    public Long getSpiCommunicationTimeout() {
+        return spiCommTimeout;
+    }
+
+    /**
      * Interval in milliseconds between message send retries.
      * <p>
      * If not provided, then default value

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/392274ee/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index addf243d..ce6882c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -173,12 +173,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Default socket send and receive buffer size. */
     public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
 
-    /** Default connection timeout (value is <tt>5000</tt>ms). */
-    public static final long DFLT_CONN_TIMEOUT = 5000;
-
-    /** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */
-    public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000;
-
     /** Default reconnect attempts count (value is <tt>10</tt>). */
     public static final int DFLT_RECONNECT_CNT = 10;
 
@@ -634,10 +628,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
 
     /** Connect timeout. */
-    private long connTimeout = DFLT_CONN_TIMEOUT;
+    private Long connTimeout;
 
     /** Maximum connect timeout. */
-    private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;
+    private Long maxConnTimeout;
 
     /** Reconnect attempts count. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -963,7 +957,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * <p>
      * {@code 0} is interpreted as infinite timeout.
      * <p>
-     * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
+     * If not provided, default value is {@link IgniteConfiguration#getSpiCommunicationTimeout()}.
      *
      * @param connTimeout Connect timeout.
      */
@@ -974,7 +968,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public long getConnectTimeout() {
-        return connTimeout;
+        if (connTimeout != null)
+            return connTimeout;
+
+        if (ignite != null && ignite.configuration().getSpiCommunicationTimeout() != null)
+            return ignite.configuration().getSpiCommunicationTimeout();
+
+        return IgniteConfiguration.DFLT_SPI_COMMUNICATION_TIMEOUT;
     }
 
     /**
@@ -985,7 +985,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * <p>
      * {@code 0} is interpreted as infinite timeout.
      * <p>
-     * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
+     * If not provided, default value is calculated in a way that connection timeout will be doubled and used at least
+     * four times before failing.
      *
      * @param maxConnTimeout Maximum connect timeout.
      */
@@ -996,7 +997,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public long getMaxConnectTimeout() {
-        return maxConnTimeout;
+        if (maxConnTimeout != null)
+            return maxConnTimeout;
+
+        return defaultMaxConnectTimeout();
     }
 
     /**
@@ -1309,6 +1313,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @Override public void spiStart(String gridName) throws IgniteSpiException {
         assert locHost != null;
 
+        if (connTimeout == null)
+            connTimeout = ignite.configuration().getSpiCommunicationTimeout() != null ?
+                ignite.configuration().getSpiCommunicationTimeout() :
+                IgniteConfiguration.DFLT_SPI_COMMUNICATION_TIMEOUT;
+
+        if (maxConnTimeout == null)
+            // connTimeout will be doubled at least four times before failing. Using geometric progression formula.
+            maxConnTimeout = defaultMaxConnectTimeout();
+
         // Start SPI start stopwatch.
         startStopwatch();
 
@@ -2020,7 +2033,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     long rcvCnt = -1;
 
                     try {
-                        ch.socket().connect(addr, (int)connTimeout);
+                        ch.socket().connect(addr, connTimeout.intValue());
 
                         rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
 
@@ -2348,6 +2361,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * Calculates default value for max connect timeout.
+     *
+     * @return Max connect timeout.
+     */
+    private long defaultMaxConnectTimeout() {
+        return getConnectTimeout() * (1 << 4);
+    }
+    
+    /**
      * @param msg Error message.
      * @param e Exception.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/392274ee/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 7663fe6..764ec92 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -157,6 +157,9 @@ import java.util.concurrent.atomic.*;
 @DiscoverySpiOrderSupport(true)
 @DiscoverySpiHistorySupport(true)
 public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
+    /** */
+    private static final double HEARTBEAT_DIVIDER = 2.5;
+
     /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
     public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
 
@@ -169,30 +172,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Default timeout for joining topology (value is <tt>0</tt>). */
     public static final long DFLT_JOIN_TIMEOUT = 0;
 
-    /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */
-    public static final long DFLT_NETWORK_TIMEOUT = 5000;
-
     /** Default value for thread priority (value is <tt>10</tt>). */
     public static final int DFLT_THREAD_PRI = 10;
 
-    /** Default heartbeat messages issuing frequency (value is <tt>2000ms</tt>). */
-    public static final long DFLT_HEARTBEAT_FREQ = 2000;
-
     /** Default size of topology snapshots history. */
     public static final int DFLT_TOP_HISTORY_SIZE = 1000;
 
-    /** Default socket operations timeout in milliseconds (value is <tt>5000ms</tt>). */
-    public static final long DFLT_SOCK_TIMEOUT = 5000;
-
-    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5000ms</tt>). */
-    public static final long DFLT_ACK_TIMEOUT = 5000;
-
-    /** Default socket operations timeout in milliseconds (value is <tt>5000ms</tt>). */
-    public static final long DFLT_SOCK_TIMEOUT_CLIENT = 5000;
-
-    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5000ms</tt>). */
-    public static final long DFLT_ACK_TIMEOUT_CLIENT = 5000;
-
     /** Default reconnect attempts count (value is <tt>10</tt>). */
     public static final int DFLT_RECONNECT_CNT = 10;
 
@@ -208,9 +193,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Default statistics print frequency in milliseconds (value is <tt>0ms</tt>). */
     public static final long DFLT_STATS_PRINT_FREQ = 0;
 
-    /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */
-    public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
-
     /** Local address. */
     protected String locAddr;
 
@@ -221,13 +203,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     protected TcpDiscoveryIpFinder ipFinder;
 
     /** Socket operations timeout. */
-    protected long sockTimeout; // Must be initialized in the constructor of child class.
+    protected Long sockTimeout; // Must be initialized in the constructor of child class.
 
     /** Message acknowledgement timeout. */
-    protected long ackTimeout; // Must be initialized in the constructor of child class.
+    protected Long ackTimeout; // Must be initialized in the constructor of child class.
 
     /** Network timeout. */
-    protected long netTimeout = DFLT_NETWORK_TIMEOUT;
+    protected Long netTimeout;
 
     /** Join timeout. */
     @SuppressWarnings("RedundantFieldInitialization")
@@ -237,7 +219,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     protected int threadPri = DFLT_THREAD_PRI;
 
     /** Heartbeat messages issuing frequency. */
-    protected long hbFreq = DFLT_HEARTBEAT_FREQ;
+    protected Long hbFreq;
 
     /** Size of topology snapshots history. */
     protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
@@ -290,7 +272,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
 
     /** Maximum message acknowledgement timeout. */
-    protected long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
+    protected Long maxAckTimeout;
 
     /** Max heartbeats count node can miss without initiating status check. */
     protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
@@ -505,7 +487,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /** {@inheritDoc} */
     @Override public long getMaxAckTimeout() {
-        return maxAckTimeout;
+        return maxAckTimeout != null ? maxAckTimeout : defaultMaxAckTimeout();
     }
 
     /**
@@ -516,7 +498,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * timeout will be increased. If no acknowledgement is received and {@code maxAckTimeout}
      * is reached, then the process of message sending is considered as failed.
      * <p>
-     * If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}.
+     * If not specified, default is calculated in a way that message acknowledgement timeout will be doubled and used
+     * at least four times before failing.
      * <p>
      * Affected server nodes only.
      *
@@ -690,7 +673,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * Note that when running Ignite on Amazon EC2, socket timeout must be set to a value
      * significantly greater than the default (e.g. to {@code 30000}).
      * <p>
-     * If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}.
+     * If not specified, default is {@link IgniteConfiguration#getSpiCommunicationTimeout()}.
      *
      * @param sockTimeout Socket connection timeout.
      */
@@ -707,7 +690,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * If acknowledgement is not received within this timeout, sending is considered as failed
      * and SPI tries to repeat message sending.
      * <p>
-     * If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}.
+     * If not specified, default is {@link IgniteConfiguration#getSpiCommunicationTimeout()}.
      *
      * @param ackTimeout Acknowledgement timeout.
      */
@@ -719,13 +702,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /**
+     * Deprecated, use {@link IgniteConfiguration#setSpiCommunicationTimeout(long)} instead.
+     *
      * Sets maximum network timeout to use for network operations.
      * <p>
-     * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}.
+     * If not specified, default is {@link IgniteConfiguration#getSpiCommunicationTimeout()}.
      *
      * @param netTimeout Network timeout.
      */
     @IgniteSpiConfiguration(optional = true)
+    @Deprecated
     public TcpDiscoverySpi setNetworkTimeout(long netTimeout) {
         this.netTimeout = netTimeout;
 
@@ -776,7 +762,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
      * in configurable time interval to other nodes to notify them about its state.
      * <p>
-     * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
+     * If not provided, default value is {@link IgniteConfiguration#getSpiCommunicationTimeout()} divided by two and a
+     * half.
      *
      * @param hbFreq Heartbeat frequency in milliseconds.
      */
@@ -927,17 +914,17 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /** {@inheritDoc} */
     @Override public long getSocketTimeout() {
-        return sockTimeout;
+        return sockTimeout != null ? sockTimeout : defaultSpiCommunicationTimeout();
     }
 
     /** {@inheritDoc} */
     @Override public long getAckTimeout() {
-        return ackTimeout;
+        return ackTimeout != null ? ackTimeout : defaultSpiCommunicationTimeout();
     }
 
     /** {@inheritDoc} */
     @Override public long getNetworkTimeout() {
-        return netTimeout;
+        return netTimeout != null ? netTimeout : defaultSpiCommunicationTimeout();
     }
 
     /** {@inheritDoc} */
@@ -947,7 +934,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /** {@inheritDoc} */
     @Override public long getHeartbeatFrequency() {
-        return hbFreq;
+        return hbFreq != null ? hbFreq : (long)(defaultSpiCommunicationTimeout() / HEARTBEAT_DIVIDER);
     }
 
     /** {@inheritDoc} */
@@ -1112,7 +1099,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         sock.setTcpNoDelay(true);
 
-        sock.connect(resolved, (int)sockTimeout);
+        sock.connect(resolved, sockTimeout.intValue());
 
         writeToSocket(sock, U.IGNITE_HEADER);
 
@@ -1540,26 +1527,31 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
     /** {@inheritDoc} */
     @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
-        if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
-            if (ackTimeout == 0)
-                ackTimeout = DFLT_ACK_TIMEOUT_CLIENT;
+        long commTimeout = defaultSpiCommunicationTimeout();
+
+        if (ackTimeout == null)
+            ackTimeout = commTimeout;
+
+        if (sockTimeout == null)
+            sockTimeout = commTimeout;
+
+        if (netTimeout == null)
+            netTimeout = commTimeout;
+
+        if (hbFreq == null)
+            hbFreq = (long)(commTimeout / HEARTBEAT_DIVIDER);
 
-            if (sockTimeout == 0)
-                sockTimeout = DFLT_SOCK_TIMEOUT_CLIENT;
+        if (maxAckTimeout == null)
+            // ackTimeout will be doubled at least four times before failing. Using geometric progression formula.
+            maxAckTimeout = defaultMaxAckTimeout();
 
+        if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
             impl = new ClientImpl(this);
 
             ctxInitLatch.countDown();
         }
-        else {
-            if (ackTimeout == 0)
-                ackTimeout = DFLT_ACK_TIMEOUT;
-
-            if (sockTimeout == 0)
-                sockTimeout = DFLT_SOCK_TIMEOUT;
-
+        else
             impl = new ServerImpl(this);
-        }
 
         assertParameter(ipFinder != null, "ipFinder != null");
         assertParameter(hbFreq > 0, "heartbeatFreq > 0");
@@ -1764,6 +1756,33 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /**
+     * Default SPI communication timeout.
+     *
+     * @return SPI communication timeout.
+     */
+    private long defaultSpiCommunicationTimeout() {
+        if (ignite == null)
+            return IgniteConfiguration.DFLT_SPI_COMMUNICATION_TIMEOUT;
+
+        if (ignite().configuration().getSpiCommunicationTimeout() != null)
+            return ignite().configuration().getSpiCommunicationTimeout();
+
+        if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode())))
+            return IgniteConfiguration.DFLT_SPI_COMMUNICATION_TIMEOUT_CLIENT;
+        else
+            return IgniteConfiguration.DFLT_SPI_COMMUNICATION_TIMEOUT;
+    }
+
+    /**
+     * Default max ack timeout.
+     *
+     * @return Max acknowledgement timeout.
+     */
+    private long defaultMaxAckTimeout() {
+        return getAckTimeout() * (1 << 4);
+    }
+
+    /**
      * Socket timeout object.
      */
     private class SocketTimeoutObject implements IgniteSpiTimeoutObject {