You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/01 15:55:14 UTC
incubator-ignite git commit: ignite-752: implemented
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-752 e91bc484e -> 392274ee5
ignite-752: implemented
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/392274ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/392274ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/392274ee
Branch: refs/heads/ignite-752
Commit: 392274ee5d079a22d1c5c8010d0c101cbc462867
Parents: e91bc48
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Jul 1 16:54:58 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jul 1 16:54:58 2015 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 44 +++++++
.../communication/tcp/TcpCommunicationSpi.java | 48 ++++++--
.../spi/discovery/tcp/TcpDiscoverySpi.java | 119 +++++++++++--------
3 files changed, 148 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/392274ee/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 2d36c7a..d1d55ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -108,6 +108,12 @@ public class IgniteConfiguration {
/** Default message send retries count. */
public static final int DFLT_SEND_RETRY_CNT = 3;
+ /** Default communication timeout for network related operations in SPI. Value is <tt>5000ms</tt>. */
+ public static final long DFLT_SPI_COMMUNICATION_TIMEOUT = 5000;
+
+ /** Default communication timeout for network related operations in SPI in client mode. Value is <tt>5000ms</tt>. */
+ public static final long DFLT_SPI_COMMUNICATION_TIMEOUT_CLIENT = 5000;
+
/** Default number of clock sync samples. */
public static final int DFLT_CLOCK_SYNC_SAMPLES = 8;
@@ -258,6 +264,9 @@ public class IgniteConfiguration {
/** Maximum network requests timeout. */
private long netTimeout = DFLT_NETWORK_TIMEOUT;
+ /* SPI communication timeout. */
+ private Long spiCommTimeout;
+
/** Interval between message send retries. */
private long sndRetryDelay = DFLT_SEND_RETRY_DELAY;
@@ -468,6 +477,7 @@ public class IgniteConfiguration {
metricsUpdateFreq = cfg.getMetricsUpdateFrequency();
mgmtPoolSize = cfg.getManagementThreadPoolSize();
netTimeout = cfg.getNetworkTimeout();
+ spiCommTimeout = cfg.getSpiCommunicationTimeout();
nodeId = cfg.getNodeId();
p2pEnabled = cfg.isPeerClassLoadingEnabled();
p2pLocClsPathExcl = cfg.getPeerClassLoadingLocalClassPathExclude();
@@ -1176,6 +1186,40 @@ public class IgniteConfiguration {
}
/**
+ * Sets maximum timeout for most network related operations of {@link TcpDiscoverySpi} and
+ * {@link TcpCommunicationSpi}.
+ *
+ * In case of {@link TcpDiscoverySpi} this communication timeout is used as default for socket timeout,
+ * network timeout and message acknowledgement timeout. Basing on this timeout default values are calculated
+ * for {@link TcpDiscoverySpi} heartbeat frequency and maximum message acknowledgement timeout.
+ *
+ * In case of {@link TcpCommunicationSpi} this timeout is set as default for socket connection timeout and
+ * used for maximum socket connection timeout calculation.
+ *
+ * @param spiCommTimeout SPI communication timeout.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setSpiCommunicationTimeout(long spiCommTimeout) {
+ this.spiCommTimeout = spiCommTimeout;
+
+ return this;
+ }
+
+ /**
+ * Gets SPI communication used as default for most network related operations of {@link TcpDiscoverySpi} and
+ * {@link TcpCommunicationSpi}.
+ *
+ * If the timeout is not explicitly set, then either {@link #DFLT_SPI_COMMUNICATION_TIMEOUT} or
+ * {@link #DFLT_SPI_COMMUNICATION_TIMEOUT_CLIENT} is used depending on whether a node is run on server or cliend
+ * mode.
+ *
+ * @return Timeout value or {@code null} if the value has not been set explicitly.
+ */
+ public Long getSpiCommunicationTimeout() {
+ return spiCommTimeout;
+ }
+
+ /**
* Interval in milliseconds between message send retries.
* <p>
* If not provided, then default value
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/392274ee/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index addf243d..ce6882c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -173,12 +173,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Default socket send and receive buffer size. */
public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
- /** Default connection timeout (value is <tt>5000</tt>ms). */
- public static final long DFLT_CONN_TIMEOUT = 5000;
-
- /** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */
- public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000;
-
/** Default reconnect attempts count (value is <tt>10</tt>). */
public static final int DFLT_RECONNECT_CNT = 10;
@@ -634,10 +628,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
/** Connect timeout. */
- private long connTimeout = DFLT_CONN_TIMEOUT;
+ private Long connTimeout;
/** Maximum connect timeout. */
- private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;
+ private Long maxConnTimeout;
/** Reconnect attempts count. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -963,7 +957,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* <p>
* {@code 0} is interpreted as infinite timeout.
* <p>
- * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
+ * If not provided, default value is {@link IgniteConfiguration#getSpiCommunicationTimeout()}.
*
* @param connTimeout Connect timeout.
*/
@@ -974,7 +968,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public long getConnectTimeout() {
- return connTimeout;
+ if (connTimeout != null)
+ return connTimeout;
+
+ if (ignite != null && ignite.configuration().getSpiCommunicationTimeout() != null)
+ return ignite.configuration().getSpiCommunicationTimeout();
+
+ return IgniteConfiguration.DFLT_SPI_COMMUNICATION_TIMEOUT;
}
/**
@@ -985,7 +985,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* <p>
* {@code 0} is interpreted as infinite timeout.
* <p>
- * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
+ * If not provided, default value is calculated in a way that connection timeout will be doubled and used at least
+ * four times before failing.
*
* @param maxConnTimeout Maximum connect timeout.
*/
@@ -996,7 +997,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public long getMaxConnectTimeout() {
- return maxConnTimeout;
+ if (maxConnTimeout != null)
+ return maxConnTimeout;
+
+ return defaultMaxConnectTimeout();
}
/**
@@ -1309,6 +1313,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
@Override public void spiStart(String gridName) throws IgniteSpiException {
assert locHost != null;
+ if (connTimeout == null)
+ connTimeout = ignite.configuration().getSpiCommunicationTimeout() != null ?
+ ignite.configuration().getSpiCommunicationTimeout() :
+ IgniteConfiguration.DFLT_SPI_COMMUNICATION_TIMEOUT;
+
+ if (maxConnTimeout == null)
+ // connTimeout will be doubled at least four times before failing. Using geometric progression formula.
+ maxConnTimeout = defaultMaxConnectTimeout();
+
// Start SPI start stopwatch.
startStopwatch();
@@ -2020,7 +2033,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long rcvCnt = -1;
try {
- ch.socket().connect(addr, (int)connTimeout);
+ ch.socket().connect(addr, connTimeout.intValue());
rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
@@ -2348,6 +2361,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/**
+ * Calculates default value for max connect timeout.
+ *
+ * @return Max connect timeout.
+ */
+ private long defaultMaxConnectTimeout() {
+ return getConnectTimeout() * (1 << 4);
+ }
+
+ /**
* @param msg Error message.
* @param e Exception.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/392274ee/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 7663fe6..764ec92 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -157,6 +157,9 @@ import java.util.concurrent.atomic.*;
@DiscoverySpiOrderSupport(true)
@DiscoverySpiHistorySupport(true)
public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
+ /** */
+ private static final double HEARTBEAT_DIVIDER = 2.5;
+
/** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
@@ -169,30 +172,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Default timeout for joining topology (value is <tt>0</tt>). */
public static final long DFLT_JOIN_TIMEOUT = 0;
- /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */
- public static final long DFLT_NETWORK_TIMEOUT = 5000;
-
/** Default value for thread priority (value is <tt>10</tt>). */
public static final int DFLT_THREAD_PRI = 10;
- /** Default heartbeat messages issuing frequency (value is <tt>2000ms</tt>). */
- public static final long DFLT_HEARTBEAT_FREQ = 2000;
-
/** Default size of topology snapshots history. */
public static final int DFLT_TOP_HISTORY_SIZE = 1000;
- /** Default socket operations timeout in milliseconds (value is <tt>5000ms</tt>). */
- public static final long DFLT_SOCK_TIMEOUT = 5000;
-
- /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5000ms</tt>). */
- public static final long DFLT_ACK_TIMEOUT = 5000;
-
- /** Default socket operations timeout in milliseconds (value is <tt>5000ms</tt>). */
- public static final long DFLT_SOCK_TIMEOUT_CLIENT = 5000;
-
- /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5000ms</tt>). */
- public static final long DFLT_ACK_TIMEOUT_CLIENT = 5000;
-
/** Default reconnect attempts count (value is <tt>10</tt>). */
public static final int DFLT_RECONNECT_CNT = 10;
@@ -208,9 +193,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Default statistics print frequency in milliseconds (value is <tt>0ms</tt>). */
public static final long DFLT_STATS_PRINT_FREQ = 0;
- /** Maximum ack timeout value for receiving message acknowledgement in milliseconds (value is <tt>600,000ms</tt>). */
- public static final long DFLT_MAX_ACK_TIMEOUT = 10 * 60 * 1000;
-
/** Local address. */
protected String locAddr;
@@ -221,13 +203,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
protected TcpDiscoveryIpFinder ipFinder;
/** Socket operations timeout. */
- protected long sockTimeout; // Must be initialized in the constructor of child class.
+ protected Long sockTimeout; // Must be initialized in the constructor of child class.
/** Message acknowledgement timeout. */
- protected long ackTimeout; // Must be initialized in the constructor of child class.
+ protected Long ackTimeout; // Must be initialized in the constructor of child class.
/** Network timeout. */
- protected long netTimeout = DFLT_NETWORK_TIMEOUT;
+ protected Long netTimeout;
/** Join timeout. */
@SuppressWarnings("RedundantFieldInitialization")
@@ -237,7 +219,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
protected int threadPri = DFLT_THREAD_PRI;
/** Heartbeat messages issuing frequency. */
- protected long hbFreq = DFLT_HEARTBEAT_FREQ;
+ protected Long hbFreq;
/** Size of topology snapshots history. */
protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
@@ -290,7 +272,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
/** Maximum message acknowledgement timeout. */
- protected long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
+ protected Long maxAckTimeout;
/** Max heartbeats count node can miss without initiating status check. */
protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
@@ -505,7 +487,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** {@inheritDoc} */
@Override public long getMaxAckTimeout() {
- return maxAckTimeout;
+ return maxAckTimeout != null ? maxAckTimeout : defaultMaxAckTimeout();
}
/**
@@ -516,7 +498,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* timeout will be increased. If no acknowledgement is received and {@code maxAckTimeout}
* is reached, then the process of message sending is considered as failed.
* <p>
- * If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}.
+ * If not specified, default is calculated in a way that message acknowledgement timeout will be doubled and used
+ * at least four times before failing.
* <p>
* Affected server nodes only.
*
@@ -690,7 +673,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* Note that when running Ignite on Amazon EC2, socket timeout must be set to a value
* significantly greater than the default (e.g. to {@code 30000}).
* <p>
- * If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}.
+ * If not specified, default is {@link IgniteConfiguration#getSpiCommunicationTimeout()}.
*
* @param sockTimeout Socket connection timeout.
*/
@@ -707,7 +690,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* If acknowledgement is not received within this timeout, sending is considered as failed
* and SPI tries to repeat message sending.
* <p>
- * If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}.
+ * If not specified, default is {@link IgniteConfiguration#getSpiCommunicationTimeout()}.
*
* @param ackTimeout Acknowledgement timeout.
*/
@@ -719,13 +702,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/**
+ * Deprecated, use {@link IgniteConfiguration#setSpiCommunicationTimeout(long)} instead.
+ *
* Sets maximum network timeout to use for network operations.
* <p>
- * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}.
+ * If not specified, default is {@link IgniteConfiguration#getSpiCommunicationTimeout()}.
*
* @param netTimeout Network timeout.
*/
@IgniteSpiConfiguration(optional = true)
+ @Deprecated
public TcpDiscoverySpi setNetworkTimeout(long netTimeout) {
this.netTimeout = netTimeout;
@@ -776,7 +762,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
* in configurable time interval to other nodes to notify them about its state.
* <p>
- * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
+ * If not provided, default value is {@link IgniteConfiguration#getSpiCommunicationTimeout()} divided by two and a
+ * half.
*
* @param hbFreq Heartbeat frequency in milliseconds.
*/
@@ -927,17 +914,17 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** {@inheritDoc} */
@Override public long getSocketTimeout() {
- return sockTimeout;
+ return sockTimeout != null ? sockTimeout : defaultSpiCommunicationTimeout();
}
/** {@inheritDoc} */
@Override public long getAckTimeout() {
- return ackTimeout;
+ return ackTimeout != null ? ackTimeout : defaultSpiCommunicationTimeout();
}
/** {@inheritDoc} */
@Override public long getNetworkTimeout() {
- return netTimeout;
+ return netTimeout != null ? netTimeout : defaultSpiCommunicationTimeout();
}
/** {@inheritDoc} */
@@ -947,7 +934,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** {@inheritDoc} */
@Override public long getHeartbeatFrequency() {
- return hbFreq;
+ return hbFreq != null ? hbFreq : (long)(defaultSpiCommunicationTimeout() / HEARTBEAT_DIVIDER);
}
/** {@inheritDoc} */
@@ -1112,7 +1099,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
sock.setTcpNoDelay(true);
- sock.connect(resolved, (int)sockTimeout);
+ sock.connect(resolved, sockTimeout.intValue());
writeToSocket(sock, U.IGNITE_HEADER);
@@ -1540,26 +1527,31 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
- if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
- if (ackTimeout == 0)
- ackTimeout = DFLT_ACK_TIMEOUT_CLIENT;
+ long commTimeout = defaultSpiCommunicationTimeout();
+
+ if (ackTimeout == null)
+ ackTimeout = commTimeout;
+
+ if (sockTimeout == null)
+ sockTimeout = commTimeout;
+
+ if (netTimeout == null)
+ netTimeout = commTimeout;
+
+ if (hbFreq == null)
+ hbFreq = (long)(commTimeout / HEARTBEAT_DIVIDER);
- if (sockTimeout == 0)
- sockTimeout = DFLT_SOCK_TIMEOUT_CLIENT;
+ if (maxAckTimeout == null)
+ // ackTimeout will be doubled at least four times before failing. Using geometric progression formula.
+ maxAckTimeout = defaultMaxAckTimeout();
+ if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
impl = new ClientImpl(this);
ctxInitLatch.countDown();
}
- else {
- if (ackTimeout == 0)
- ackTimeout = DFLT_ACK_TIMEOUT;
-
- if (sockTimeout == 0)
- sockTimeout = DFLT_SOCK_TIMEOUT;
-
+ else
impl = new ServerImpl(this);
- }
assertParameter(ipFinder != null, "ipFinder != null");
assertParameter(hbFreq > 0, "heartbeatFreq > 0");
@@ -1764,6 +1756,33 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/**
+ * Default SPI communication timeout.
+ *
+ * @return SPI communication timeout.
+ */
+ private long defaultSpiCommunicationTimeout() {
+ if (ignite == null)
+ return IgniteConfiguration.DFLT_SPI_COMMUNICATION_TIMEOUT;
+
+ if (ignite().configuration().getSpiCommunicationTimeout() != null)
+ return ignite().configuration().getSpiCommunicationTimeout();
+
+ if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode())))
+ return IgniteConfiguration.DFLT_SPI_COMMUNICATION_TIMEOUT_CLIENT;
+ else
+ return IgniteConfiguration.DFLT_SPI_COMMUNICATION_TIMEOUT;
+ }
+
+ /**
+ * Default max ack timeout.
+ *
+ * @return Max acknowledgement timeout.
+ */
+ private long defaultMaxAckTimeout() {
+ return getAckTimeout() * (1 << 4);
+ }
+
+ /**
* Socket timeout object.
*/
private class SocketTimeoutObject implements IgniteSpiTimeoutObject {