You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2015/07/31 08:10:41 UTC
[05/50] [abbrv] incubator-ignite git commit: master: back merge from
ignite-752
master: back merge from ignite-752
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cff25e91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cff25e91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cff25e91
Branch: refs/heads/ignite-843
Commit: cff25e91ac16fb11f3790690ec28d39a729519d9
Parents: ae148f1
Author: dmagda <ma...@gmail.com>
Authored: Fri Jul 24 15:32:51 2015 +0300
Committer: dmagda <ma...@gmail.com>
Committed: Fri Jul 24 15:32:51 2015 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 35 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 58 +++
.../spi/IgniteSpiOperationTimeoutException.java | 43 ++
.../spi/IgniteSpiOperationTimeoutHelper.java | 102 ++++
.../communication/tcp/TcpCommunicationSpi.java | 122 ++++-
.../ignite/spi/discovery/tcp/ClientImpl.java | 52 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 509 +++++++++++--------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 11 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 135 +++--
.../tcp/internal/TcpDiscoveryNode.java | 21 +
.../TcpDiscoveryConnectionCheckMessage.java | 64 +++
.../IgniteClientReconnectAbstractTest.java | 4 +-
.../GridTcpCommunicationSpiAbstractTest.java | 2 +-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 3 +-
...tionSpiRecoveryFailureDetectionSelfTest.java | 54 ++
...GridTcpCommunicationSpiRecoverySelfTest.java | 23 +-
...unicationSpiTcpFailureDetectionSelfTest.java | 75 +++
.../discovery/AbstractDiscoverySelfTest.java | 23 +-
...lientDiscoverySpiFailureTimeoutSelfTest.java | 205 ++++++++
.../tcp/TcpClientDiscoverySpiSelfTest.java | 116 +++--
.../tcp/TcpDiscoverySpiConfigSelfTest.java | 1 +
.../TcpDiscoverySpiFailureTimeoutSelfTest.java | 402 +++++++++++++++
.../IgniteSpiCommunicationSelfTestSuite.java | 3 +
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +
24 files changed, 1749 insertions(+), 316 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 823ddcd..aac1754 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -190,6 +190,11 @@ public class IgniteConfiguration {
/** Default value for cache sanity check enabled flag. */
public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
+ /** Default failure detection timeout in millis. */
+ @SuppressWarnings("UnnecessaryBoxing")
+// public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000);
+ public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000);
+
/** Optional grid name. */
private String gridName;
@@ -367,6 +372,9 @@ public class IgniteConfiguration {
/** Port number range for time server. */
private int timeSrvPortRange = DFLT_TIME_SERVER_PORT_RANGE;
+ /** Failure detection timeout. */
+ private Long failureDetectionTimeout = DFLT_FAILURE_DETECTION_TIMEOUT;
+
/** Property names to include into node attributes. */
private String[] includeProps;
@@ -449,7 +457,7 @@ public class IgniteConfiguration {
consistentId = cfg.getConsistentId();
deployMode = cfg.getDeploymentMode();
discoStartupDelay = cfg.getDiscoveryStartupDelay();
- pubPoolSize = cfg.getPublicThreadPoolSize();
+ failureDetectionTimeout = cfg.getFailureDetectionTimeout();
ggHome = cfg.getIgniteHome();
ggWork = cfg.getWorkDirectory();
gridName = cfg.getGridName();
@@ -479,6 +487,7 @@ public class IgniteConfiguration {
p2pMissedCacheSize = cfg.getPeerClassLoadingMissedResourcesCacheSize();
p2pPoolSize = cfg.getPeerClassLoadingThreadPoolSize();
pluginCfgs = cfg.getPluginConfigurations();
+ pubPoolSize = cfg.getPublicThreadPoolSize();
segChkFreq = cfg.getSegmentCheckFrequency();
segPlc = cfg.getSegmentationPolicy();
segResolveAttempts = cfg.getSegmentationResolveAttempts();
@@ -1655,6 +1664,30 @@ public class IgniteConfiguration {
}
/**
+ * Returns failure detection timeout used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}.
+ * <p>
+ * Default is {@link #DFLT_FAILURE_DETECTION_TIMEOUT}.
+ *
+ * @see #setFailureDetectionTimeout(long)
+ * @return Failure detection timeout in milliseconds.
+ */
+ public Long getFailureDetectionTimeout() {
+ return failureDetectionTimeout;
+ }
+
+ /**
+ * Sets failure detection timeout to use in {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}.
+ * <p>
+ * Failure detection timeout is used to determine how long the communication or discovery SPIs should wait before
+ * considering a remote connection failed.
+ *
+ * @param failureDetectionTimeout Failure detection timeout in milliseconds.
+ */
+ public void setFailureDetectionTimeout(long failureDetectionTimeout) {
+ this.failureDetectionTimeout = failureDetectionTimeout;
+ }
+
+ /**
* Should return fully configured load balancing SPI implementation. If not provided,
* {@link RoundRobinLoadBalancingSpi} will be used.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 2f3def9..f809d82 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
@@ -74,6 +75,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** Local node. */
private ClusterNode locNode;
+ /** Failure detection timeout usage switch. */
+ private boolean failureDetectionTimeoutEnabled = true;
+
+ /**
+ * Failure detection timeout. Initialized with the value of
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()}.
+ */
+ private long failureDetectionTimeout;
+
/**
* Creates new adapter and initializes it from the current (this) class.
* SPI name will be initialized to the simple name of the class
@@ -583,6 +593,54 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
}
/**
+ * Initiates and checks failure detection timeout value.
+ */
+ protected void initFailureDetectionTimeout() {
+ if (failureDetectionTimeoutEnabled) {
+ failureDetectionTimeout = ignite.configuration().getFailureDetectionTimeout();
+
+ if (failureDetectionTimeout <= 0)
+ throw new IgniteSpiException("Invalid failure detection timeout value: " + failureDetectionTimeout);
+ else if (failureDetectionTimeout <= 10)
+ // Because U.currentTimeInMillis() is updated once in 10 milliseconds.
+ log.warning("Failure detection timeout is too low, it may lead to unpredictable behaviour " +
+ "[failureDetectionTimeout=" + failureDetectionTimeout + ']');
+ }
+ // Intentionally compare references using '!=' below
+ else if (ignite.configuration().getFailureDetectionTimeout() !=
+ IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT)
+ log.warning("Failure detection timeout will be ignored (one of SPI parameters has been set explicitly)");
+
+ }
+
+ /**
+ * Enables or disables failure detection timeout.
+ *
+ * @param enabled {@code true} if enable, {@code false} otherwise.
+ */
+ public void failureDetectionTimeoutEnabled(boolean enabled) {
+ failureDetectionTimeoutEnabled = enabled;
+ }
+
+ /**
+ * Checks whether failure detection timeout is enabled for this {@link IgniteSpi}.
+ *
+ * @return {@code true} if enabled, {@code false} otherwise.
+ */
+ public boolean failureDetectionTimeoutEnabled() {
+ return failureDetectionTimeoutEnabled;
+ }
+
+ /**
+ * Returns failure detection timeout set to use for network related operations.
+ *
+ * @return failure detection timeout in milliseconds or {@code 0} if the timeout is disabled.
+ */
+ public long failureDetectionTimeout() {
+ return failureDetectionTimeout;
+ }
+
+ /**
* Temporarily SPI context.
*/
private class GridDummySpiContext implements IgniteSpiContext {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
new file mode 100644
index 0000000..0e34cf2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.configuration.*;
+
+/**
+ * Kind of exception that is used when failure detection timeout is enabled for {@link TcpDiscoverySpi} or
+ * {@link TcpCommunicationSpi}.
+ *
+ * For more information refer to {@link IgniteConfiguration#setFailureDetectionTimeout(long)} and
+ * {@link IgniteSpiOperationTimeoutHelper}.
+ */
+public class IgniteSpiOperationTimeoutException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ * @param msg Error message.
+ */
+ public IgniteSpiOperationTimeoutException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
new file mode 100644
index 0000000..f7d8daa
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.net.*;
+
+/**
+ * Object that incorporates logic that determines a timeout value for the next network related operation and checks
+ * whether a failure detection timeout is reached or not.
+ *
+ * A new instance of the class should be created for every complex network based operations that usually consists of
+ * request and response parts.
+ */
+public class IgniteSpiOperationTimeoutHelper {
+ /** */
+ private long lastOperStartTs;
+
+ /** */
+ private long timeout;
+
+ /** */
+ private final boolean failureDetectionTimeoutEnabled;
+
+ /** */
+ private final long failureDetectionTimeout;
+
+ /**
+ * Constructor.
+ *
+ * @param adapter SPI adapter.
+ */
+ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) {
+ failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled();
+ failureDetectionTimeout = adapter.failureDetectionTimeout();
+ }
+
+ /**
+ * Returns a timeout value to use for the next network operation.
+ *
+ * If failure detection timeout is enabled then the returned value is a portion of time left since the last time
+ * this method is called. If the timeout is disabled then {@code dfltTimeout} is returned.
+ *
+ * @param dfltTimeout Timeout to use if failure detection timeout is disabled.
+ * @return Timeout in milliseconds.
+ * @throws IgniteSpiOperationTimeoutException If failure detection timeout is reached for an operation that uses
+ * this {@code IgniteSpiOperationTimeoutController}.
+ */
+ public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException {
+ if (!failureDetectionTimeoutEnabled)
+ return dfltTimeout;
+
+ if (lastOperStartTs == 0) {
+ timeout = failureDetectionTimeout;
+ lastOperStartTs = U.currentTimeMillis();
+ }
+ else {
+ long curTs = U.currentTimeMillis();
+
+ timeout = timeout - (curTs - lastOperStartTs);
+
+ lastOperStartTs = curTs;
+
+ if (timeout <= 0)
+ throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+ "'failureDetectionTimeout' configuration property [failureDetectionTimeout="
+ + failureDetectionTimeout + ']');
+ }
+
+ return timeout;
+ }
+
+ /**
+ * Checks whether the given {@link Exception} is generated because failure detection timeout has been reached.
+ *
+ * @param e Exception.
+ * @return {@code true} if failure detection timeout is reached, {@code false} otherwise.
+ */
+ public boolean checkFailureTimeoutReached(Exception e) {
+ if (!failureDetectionTimeoutEnabled)
+ return false;
+
+ return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException ||
+ X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index e9fd696..7be1dbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -73,7 +73,21 @@ import static org.apache.ignite.events.EventType.*;
* {@link #DFLT_IDLE_CONN_TIMEOUT} period and then are closed. Use
* {@link #setIdleConnectionTimeout(long)} configuration parameter to configure
* you own idle connection timeout.
+ * <h1 class="header">Failure Detection</h1>
+ * Configuration defaults (see Configuration section below and
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for
+ * communication SPI work reliably on most of hardware and virtual deployments, but this has made failure detection
+ * time worse.
* <p>
+ * If it's needed to tune failure detection then it's highly recommended to do this using
+ * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the
+ * following parameters: {@link #getConnectTimeout()}, {@link #getMaxConnectTimeout()},
+ * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be
+ * ignored.
+ * <p>
+ * If it's required to perform advanced settings of failure detection and
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpCommunicationSpi}
+ * configuration parameters may be used.
* <h1 class="header">Configuration</h1>
* <h2 class="header">Mandatory</h2>
* This SPI has no mandatory configuration parameters.
@@ -991,12 +1005,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* {@code 0} is interpreted as infinite timeout.
* <p>
* If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
+ * <p>
+ * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param connTimeout Connect timeout.
*/
@IgniteSpiConfiguration(optional = true)
public void setConnectTimeout(long connTimeout) {
this.connTimeout = connTimeout;
+
+ failureDetectionTimeoutEnabled(false);
}
/** {@inheritDoc} */
@@ -1013,12 +1031,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* {@code 0} is interpreted as infinite timeout.
* <p>
* If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
+ * <p>
+ * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param maxConnTimeout Maximum connect timeout.
*/
@IgniteSpiConfiguration(optional = true)
public void setMaxConnectTimeout(long maxConnTimeout) {
this.maxConnTimeout = maxConnTimeout;
+
+ failureDetectionTimeoutEnabled(false);
}
/** {@inheritDoc} */
@@ -1031,12 +1053,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* with remote nodes.
* <p>
* If not provided, default value is {@link #DFLT_RECONNECT_CNT}.
+ * <p>
+ * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param reconCnt Maximum number of reconnection attempts.
*/
@IgniteSpiConfiguration(optional = true)
public void setReconnectCount(int reconCnt) {
this.reconCnt = reconCnt;
+
+ failureDetectionTimeoutEnabled(false);
}
/** {@inheritDoc} */
@@ -1264,6 +1290,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
+ initFailureDetectionTimeout();
+
assertParameter(locPort > 1023, "locPort > 1023");
assertParameter(locPort <= 0xffff, "locPort < 0xffff");
assertParameter(locPortRange >= 0, "locPortRange >= 0");
@@ -1272,10 +1300,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1");
- assertParameter(reconCnt > 0, "reconnectCnt > 0");
assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
- assertParameter(connTimeout >= 0, "connTimeout >= 0");
- assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+
+ if (!failureDetectionTimeoutEnabled()) {
+ assertParameter(reconCnt > 0, "reconnectCnt > 0");
+ assertParameter(connTimeout >= 0, "connTimeout >= 0");
+ assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+ }
+
assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
@@ -1351,9 +1383,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("sockRcvBuf", sockRcvBuf));
log.debug(configInfo("shmemPort", shmemPort));
log.debug(configInfo("msgQueueLimit", msgQueueLimit));
- log.debug(configInfo("connTimeout", connTimeout));
- log.debug(configInfo("maxConnTimeout", maxConnTimeout));
- log.debug(configInfo("reconCnt", reconCnt));
+
+ if (failureDetectionTimeoutEnabled()) {
+ log.debug(configInfo("connTimeout", connTimeout));
+ log.debug(configInfo("maxConnTimeout", maxConnTimeout));
+ log.debug(configInfo("reconCnt", reconCnt));
+ }
+ else
+ log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout()));
+
log.debug(configInfo("sockWriteTimeout", sockWriteTimeout));
log.debug(configInfo("ackSndThreshold", ackSndThreshold));
log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
@@ -1906,17 +1944,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long connTimeout0 = connTimeout;
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+
while (true) {
GridCommunicationClient client;
try {
client = new GridShmemCommunicationClient(metricsLsnr,
port,
- connTimeout,
+ timeoutHelper.nextTimeoutChunk(connTimeout),
log,
getSpiContext().messageFormatter());
}
catch (IgniteCheckedException e) {
+ if (timeoutHelper.checkFailureTimeoutReached(e))
+ throw e;
+
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
connectAttempts++;
@@ -1928,15 +1971,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- safeHandshake(client, null, node.id(), connTimeout0);
+ safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
}
- catch (HandshakeTimeoutException e) {
+ catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
+ client.forceClose();
+
+ if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+ timeoutHelper.checkFailureTimeoutReached(e))) {
+ log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
+ failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
+
+ throw e;
+ }
+
+ assert !failureDetectionTimeoutEnabled();
+
if (log.isDebugEnabled())
- log.debug("Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+ log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
", err=" + e.getMessage() + ", client=" + client + ']');
- client.forceClose();
-
if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
if (log.isDebugEnabled())
log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
@@ -2050,6 +2103,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
int attempt = 1;
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+
while (!conn) { // Reconnection on handshake timeout.
try {
SocketChannel ch = SocketChannel.open();
@@ -2076,9 +2131,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long rcvCnt = -1;
try {
- ch.socket().connect(addr, (int)connTimeout);
+ ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout));
- rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
+ rcvCnt = safeHandshake(ch, recoveryDesc, node.id(),
+ timeoutHelper.nextTimeoutChunk(connTimeout0));
if (rcvCnt == -1)
return null;
@@ -2112,19 +2168,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
}
- catch (HandshakeTimeoutException e) {
+ catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
if (client != null) {
client.forceClose();
client = null;
}
+ if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
+ timeoutHelper.checkFailureTimeoutReached(e))) {
+
+ String msg = "Handshake timed out (failure detection timeout is reached) " +
+ "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']';
+
+ onException(msg, e);
+
+ if (log.isDebugEnabled())
+ log.debug(msg);
+
+ if (errs == null)
+ errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
+ "Make sure that each GridComputeTask and GridCacheTransaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+
+ errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+
+ break;
+ }
+
+ assert !failureDetectionTimeoutEnabled();
+
onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
", addr=" + addr + ']', e);
if (log.isDebugEnabled())
log.debug(
- "Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
+ "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
", addr=" + addr + ", err=" + e + ']');
if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
@@ -2164,7 +2244,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
- if (X.hasCause(e, SocketTimeoutException.class))
+ boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
+
+ if (failureDetThrReached)
+ LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
+ "configuration property) [addr=" + addr + ", failureDetectionTimeout=" +
+ failureDetectionTimeout() + ']');
+ else if (X.hasCause(e, SocketTimeoutException.class))
LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
"configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
@@ -2177,7 +2263,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
// Reconnect for the second time, if connection is not established.
- if (connectAttempts < 2 &&
+ if (!failureDetThrReached && connectAttempts < 2 &&
(e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
connectAttempts++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 572ba2c..12b10b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -480,13 +480,17 @@ class ClientImpl extends TcpDiscoveryImpl {
Collection<Throwable> errs = null;
- long ackTimeout0 = spi.ackTimeout;
+ long ackTimeout0 = spi.getAckTimeout();
+
+ int reconCnt = 0;
int connectAttempts = 1;
UUID locNodeId = getLocalNodeId();
- for (int i = 0; i < spi.reconCnt; i++) {
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
+ while (true) {
boolean openSock = false;
Socket sock = null;
@@ -494,7 +498,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr);
+ sock = spi.openSocket(addr, timeoutHelper);
openSock = true;
@@ -502,7 +506,7 @@ class ClientImpl extends TcpDiscoveryImpl {
req.client(true);
- spi.writeToSocket(sock, req);
+ spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
@@ -532,7 +536,7 @@ class ClientImpl extends TcpDiscoveryImpl {
msg.client(true);
- spi.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
@@ -540,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl {
log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
", rmtNodeId=" + rmtNodeId + ']');
- return new T3<>(sock, spi.readReceipt(sock, ackTimeout0), res.clientAck());
+ return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
+ res.clientAck());
}
catch (IOException | IgniteCheckedException e) {
U.closeQuiet(sock);
@@ -555,6 +560,12 @@ class ClientImpl extends TcpDiscoveryImpl {
errs.add(e);
+ if (timeoutHelper.checkFailureTimeoutReached(e))
+ break;
+
+ if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
+ break;
+
if (!openSock) {
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2) {
@@ -566,7 +577,8 @@ class ClientImpl extends TcpDiscoveryImpl {
break; // Don't retry if we can not establish connection.
}
- if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+ if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException ||
+ X.hasCause(e, SocketTimeoutException.class))) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
@@ -868,6 +880,9 @@ class ClientImpl extends TcpDiscoveryImpl {
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
/** */
+ private final long socketTimeout;
+
+ /** */
private TcpDiscoveryAbstractMessage unackedMsg;
/**
@@ -875,6 +890,9 @@ class ClientImpl extends TcpDiscoveryImpl {
*/
protected SocketWriter() {
super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
+
+ socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ spi.getSocketTimeout();
}
/**
@@ -968,12 +986,13 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
- spi.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg, socketTimeout);
msg = null;
if (ack) {
- long waitEnd = U.currentTimeMillis() + spi.ackTimeout;
+ long waitEnd = U.currentTimeMillis() + (spi.failureDetectionTimeoutEnabled() ?
+ spi.failureDetectionTimeout() : spi.getAckTimeout());
TcpDiscoveryAbstractMessage unacked;
@@ -989,7 +1008,10 @@ class ClientImpl extends TcpDiscoveryImpl {
if (unacked != null) {
if (log.isDebugEnabled())
log.debug("Failed to get acknowledge for message, will try to reconnect " +
- "[msg=" + unacked + ", timeout=" + spi.ackTimeout + ']');
+ "[msg=" + unacked +
+ (spi.failureDetectionTimeoutEnabled() ?
+ ", failureDetectionTimeout=" + spi.failureDetectionTimeout() :
+ ", timeout=" + spi.getAckTimeout()) + ']');
throw new IOException("Failed to get acknowledge for message: " + unacked);
}
@@ -1068,11 +1090,11 @@ class ClientImpl extends TcpDiscoveryImpl {
if (join) {
joinError(new IgniteSpiException("Join process timed out, connection failed and " +
"failed to reconnect (consider increasing 'joinTimeout' configuration property) " +
- "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+ "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
}
else
- U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
- "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+ U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" +
+ " configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index dc343eb..b4f89ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -80,14 +80,6 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Client message workers. */
protected ConcurrentMap<UUID, ClientMessageWorker> clientMsgWorkers = new ConcurrentHashMap8<>();
- /** Metrics sender. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private HeartbeatsSender hbsSnd;
-
- /** Status checker. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private CheckStatusSender chkStatusSnd;
-
/** IP finder cleaner. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private IpFinderCleaner ipFinderCleaner;
@@ -229,12 +221,6 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.stats.onJoinFinished();
- hbsSnd = new HeartbeatsSender();
- hbsSnd.start();
-
- chkStatusSnd = new CheckStatusSender();
- chkStatusSnd.start();
-
if (spi.ipFinder.isShared()) {
ipFinderCleaner = new IpFinderCleaner();
ipFinderCleaner.start();
@@ -278,10 +264,10 @@ class ServerImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
synchronized (mux) {
- long threshold = U.currentTimeMillis() + spi.netTimeout;
-
long timeout = spi.netTimeout;
+ long threshold = U.currentTimeMillis() + timeout;
+
while (spiState != LEFT && timeout > 0) {
try {
mux.wait(timeout);
@@ -319,12 +305,6 @@ class ServerImpl extends TcpDiscoveryImpl {
U.interrupt(tmp);
U.joinThreads(tmp, log);
- U.interrupt(hbsSnd);
- U.join(hbsSnd, log);
-
- U.interrupt(chkStatusSnd);
- U.join(chkStatusSnd, log);
-
U.interrupt(ipFinderCleaner);
U.join(ipFinderCleaner, log);
@@ -482,6 +462,8 @@ class ServerImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
if (F.contains(spi.locNodeAddrs, addr)) {
if (clientNodeId == null)
return F.t(getLocalNodeId(), false);
@@ -494,7 +476,7 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean clientPingRes;
try {
- clientPingRes = clientWorker.ping();
+ clientPingRes = clientWorker.ping(timeoutHelper);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -517,18 +499,26 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
Socket sock = null;
- for (int i = 0; i < spi.reconCnt; i++) {
+ int reconCnt = 0;
+
+ boolean openedSock = false;
+
+ while (true) {
try {
if (addr.isUnresolved())
addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr);
+ sock = spi.openSocket(addr, timeoutHelper);
- spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId));
+ openedSock = true;
- TcpDiscoveryPingResponse res = spi.readMessage(sock, null, spi.netTimeout);
+ spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
+ timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+
+ TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
+ spi.getAckTimeout()));
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -550,6 +540,16 @@ class ServerImpl extends TcpDiscoveryImpl {
errs = new ArrayList<>();
errs.add(e);
+
+ reconCnt++;
+
+ if (!openedSock && reconCnt == 2)
+ break;
+
+ if (timeoutHelper.checkFailureTimeoutReached(e))
+ break;
+ else if (!spi.failureDetectionTimeoutEnabled() && reconCnt == spi.getReconnectCount())
+ break;
}
finally {
U.closeQuiet(sock);
@@ -607,6 +607,12 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ /** {@inheritDoc} */
+ @Override protected void onDataReceived() {
+ if (spi.failureDetectionTimeoutEnabled() && locNode != null)
+ locNode.lastDataReceivedTime(U.currentTimeMillis());
+ }
+
/**
* Tries to join this node to topology.
*
@@ -678,10 +684,10 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Join request message has been sent (waiting for coordinator response).");
synchronized (mux) {
- long threshold = U.currentTimeMillis() + spi.netTimeout;
-
long timeout = spi.netTimeout;
+ long threshold = U.currentTimeMillis() + timeout;
+
while (spiState == CONNECTING && timeout > 0) {
try {
mux.wait(timeout);
@@ -883,15 +889,19 @@ class ServerImpl extends TcpDiscoveryImpl {
Collection<Throwable> errs = null;
- long ackTimeout0 = spi.ackTimeout;
+ long ackTimeout0 = spi.getAckTimeout();
int connectAttempts = 1;
- boolean joinReqSent = false;
+ boolean joinReqSent;
UUID locNodeId = getLocalNodeId();
- for (int i = 0; i < spi.reconCnt; i++) {
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
+ int reconCnt = 0;
+
+ while (true){
// Need to set to false on each new iteration,
// since remote node may leave in the middle of the first iteration.
joinReqSent = false;
@@ -903,14 +913,16 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr);
+ sock = spi.openSocket(addr, timeoutHelper);
openSock = true;
// Handshake.
- spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+ spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(
+ spi.getSocketTimeout()));
- TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
+ ackTimeout0));
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -924,7 +936,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Send message.
tstamp = U.currentTimeMillis();
- spi.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
@@ -941,7 +953,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// E.g. due to class not found issue.
joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
- return spi.readReceipt(sock, ackTimeout0);
+ return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
}
catch (ClassCastException e) {
// This issue is rarely reproducible on AmazonEC2, but never
@@ -967,6 +979,12 @@ class ServerImpl extends TcpDiscoveryImpl {
errs.add(e);
+ if (timeoutHelper.checkFailureTimeoutReached(e))
+ break;
+
+ if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
+ break;
+
if (!openSock) {
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2) {
@@ -978,7 +996,8 @@ class ServerImpl extends TcpDiscoveryImpl {
break; // Don't retry if we can not establish connection.
}
- if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+ if (!spi.failureDetectionTimeoutEnabled() && (e instanceof SocketTimeoutException ||
+ X.hasCause(e, SocketTimeoutException.class))) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
@@ -1256,12 +1275,6 @@ class ServerImpl extends TcpDiscoveryImpl {
U.interrupt(tcpSrvr);
U.join(tcpSrvr, log);
- U.interrupt(hbsSnd);
- U.join(hbsSnd, log);
-
- U.interrupt(chkStatusSnd);
- U.join(chkStatusSnd, log);
-
U.interrupt(ipFinderCleaner);
U.join(ipFinderCleaner, log);
@@ -1350,8 +1363,7 @@ class ServerImpl extends TcpDiscoveryImpl {
b.append("Internal threads: ").append(U.nl());
b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
- b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
- b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
+
b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
@@ -1398,7 +1410,8 @@ class ServerImpl extends TcpDiscoveryImpl {
private boolean recordable(TcpDiscoveryAbstractMessage msg) {
return !(msg instanceof TcpDiscoveryHeartbeatMessage) &&
!(msg instanceof TcpDiscoveryStatusCheckMessage) &&
- !(msg instanceof TcpDiscoveryDiscardMessage);
+ !(msg instanceof TcpDiscoveryDiscardMessage) &&
+ !(msg instanceof TcpDiscoveryConnectionCheckMessage);
}
/**
@@ -1434,112 +1447,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Thread that sends heartbeats.
- */
- private class HeartbeatsSender extends IgniteSpiThread {
- /**
- * Constructor.
- */
- private HeartbeatsSender() {
- super(spi.ignite().name(), "tcp-disco-hb-sender", log);
-
- setPriority(spi.threadPri);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("BusyWait")
- @Override protected void body() throws InterruptedException {
- while (!isLocalNodeCoordinator())
- Thread.sleep(1000);
-
- if (log.isDebugEnabled())
- log.debug("Heartbeats sender has been started.");
-
- UUID nodeId = getConfiguredNodeId();
-
- while (!isInterrupted()) {
- if (spiStateCopy() != CONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Stopping heartbeats sender (SPI is not connected to topology).");
-
- return;
- }
-
- TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId);
-
- msg.verify(getLocalNodeId());
-
- msgWorker.addMessage(msg);
-
- Thread.sleep(spi.hbFreq);
- }
- }
- }
-
- /**
- * Thread that sends status check messages to next node if local node has not
- * been receiving heartbeats ({@link TcpDiscoveryHeartbeatMessage})
- * for {@link TcpDiscoverySpi#getMaxMissedHeartbeats()} *
- * {@link TcpDiscoverySpi#getHeartbeatFrequency()}.
- */
- private class CheckStatusSender extends IgniteSpiThread {
- /**
- * Constructor.
- */
- private CheckStatusSender() {
- super(spi.ignite().name(), "tcp-disco-status-check-sender", log);
-
- setPriority(spi.threadPri);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("BusyWait")
- @Override protected void body() throws InterruptedException {
- if (log.isDebugEnabled())
- log.debug("Status check sender has been started.");
-
- // Only 1 heartbeat missing is acceptable. Add 50 ms to avoid false alarm.
- long checkTimeout = (long)spi.maxMissedHbs * spi.hbFreq + 50;
-
- long lastSent = 0;
-
- while (!isInterrupted()) {
- // 1. Determine timeout.
- if (lastSent < locNode.lastUpdateTime())
- lastSent = locNode.lastUpdateTime();
-
- long timeout = (lastSent + checkTimeout) - U.currentTimeMillis();
-
- if (timeout > 0)
- Thread.sleep(timeout);
-
- // 2. Check if SPI is still connected.
- if (spiStateCopy() != CONNECTED) {
- if (log.isDebugEnabled())
- log.debug("Stopping status check sender (SPI is not connected to topology).");
-
- return;
- }
-
- // 3. Was there an update?
- if (locNode.lastUpdateTime() > lastSent || !ring.hasRemoteNodes()) {
- if (log.isDebugEnabled())
- log.debug("Skipping status check send " +
- "[locNodeLastUpdate=" + U.format(locNode.lastUpdateTime()) +
- ", hasRmts=" + ring.hasRemoteNodes() + ']');
-
- continue;
- }
-
- // 4. Send status check message.
- lastSent = U.currentTimeMillis();
-
- msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
- }
- }
- }
-
- /**
* Thread that cleans IP finder and keeps it in the correct state, unregistering
* addresses of the nodes that has left the topology.
* <p>
@@ -1861,10 +1768,49 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Socket. */
private Socket sock;
+ /** Last time status message has been sent. */
+ private long lastTimeStatusMsgSent;
+
+ /** Incoming heartbeats check frequency. */
+ private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50;
+
+ /** Last time heartbeat message has been sent. */
+ private long lastTimeHbMsgSent;
+
+ /** Time when the last status message has been sent. */
+ private long lastTimeConnCheckMsgSent;
+
+ /** Flag that keeps info on whether the threshold is reached or not. */
+ private boolean failureThresholdReached;
+
+ /** Connection check frequency. */
+ private long connCheckFreq;
+
/**
*/
protected RingMessageWorker() {
- super("tcp-disco-msg-worker");
+ super("tcp-disco-msg-worker", 10);
+
+ initConnectionCheckFrequency();
+ }
+
+ /**
+ * Initializes connection check frequency. Used only when failure detection timeout is enabled.
+ */
+ private void initConnectionCheckFrequency() {
+ if (spi.failureDetectionTimeoutEnabled()) {
+ for (int i = 3; i > 0; i--) {
+ connCheckFreq = spi.failureDetectionTimeout() / i;
+
+ if (connCheckFreq > 0)
+ break;
+ }
+
+ assert connCheckFreq > 0;
+
+ if (log.isDebugEnabled())
+ log.debug("Connection check frequency is calculated: " + connCheckFreq);
+ }
}
/**
@@ -1921,9 +1867,25 @@ class ServerImpl extends TcpDiscoveryImpl {
if (spi.ensured(msg))
msgHist.add(msg);
+ if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId()))
+ // Reset the flag.
+ failureThresholdReached = false;
+
spi.stats.onMessageProcessingFinished(msg);
}
+ /** {@inheritDoc} */
+ @Override protected void noMessageLoop() {
+ if (locNode == null)
+ return;
+
+ checkConnection();
+
+ sendHeartbeatMessage();
+
+ checkHeartbeatsReceiving();
+ }
+
/**
* Sends message across the ring.
*
@@ -1990,7 +1952,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode)
debugLog("No next node in topology.");
- if (ring.hasRemoteNodes()) {
+ if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) &&
+ !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) {
msg.senderNodeId(locNodeId);
addMessage(msg);
@@ -2027,7 +1990,7 @@ class ServerImpl extends TcpDiscoveryImpl {
List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());
addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) {
- long ackTimeout0 = spi.ackTimeout;
+ long ackTimeout0 = spi.getAckTimeout();
if (locNodeAddrs.contains(addr)){
if (log.isDebugEnabled())
@@ -2037,8 +2000,15 @@ class ServerImpl extends TcpDiscoveryImpl {
continue;
}
- for (int i = 0; i < spi.reconCnt; i++) {
+ int reconCnt = 0;
+
+ IgniteSpiOperationTimeoutHelper timeoutHelper = null;
+
+ while (true) {
if (sock == null) {
+ if (timeoutHelper == null)
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
nextNodeExists = false;
boolean success = false;
@@ -2049,14 +2019,16 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr);
+ sock = spi.openSocket(addr, timeoutHelper);
openSock = true;
// Handshake.
- writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+ writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
+ timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
- TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
+ timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -2140,8 +2112,13 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!openSock)
break; // Don't retry if we can not establish connection.
- if (e instanceof SocketTimeoutException ||
- X.hasCause(e, SocketTimeoutException.class)) {
+ if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == spi.getReconnectCount())
+ break;
+
+ if (timeoutHelper.checkFailureTimeoutReached(e))
+ break;
+ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
+ SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
@@ -2156,9 +2133,13 @@ class ServerImpl extends TcpDiscoveryImpl {
sock = null;
}
- else
+ else {
// Next node exists and accepts incoming messages.
nextNodeExists = true;
+ // Resetting timeout control object to let the code below to use a new one
+ // for the next bunch of operations.
+ timeoutHelper = null;
+ }
}
}
@@ -2195,8 +2176,12 @@ class ServerImpl extends TcpDiscoveryImpl {
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
pendingMsgs.discardId);
+ if (timeoutHelper == null)
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
try {
- writeToSocket(sock, pendingMsg);
+ writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk(
+ spi.getSocketTimeout()));
}
finally {
clearNodeAddedMessage(pendingMsg);
@@ -2204,7 +2189,7 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
- int res = spi.readReceipt(sock, ackTimeout0);
+ int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
log.debug("Pending message has been sent to next node [msg=" + msg.id() +
@@ -2215,19 +2200,34 @@ class ServerImpl extends TcpDiscoveryImpl {
debugLog("Pending message has been sent to next node [msg=" + msg.id() +
", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
", res=" + res + ']');
+
+ // Resetting timeout control object to create a new one for the next bunch of
+ // operations.
+ timeoutHelper = null;
}
}
- prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+ if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+ if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+ TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER,
+ TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER))
+ // Preserve backward compatibility with nodes of older versions.
+ msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
+ }
+ else
+ prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
try {
long tstamp = U.currentTimeMillis();
- writeToSocket(sock, msg);
+ if (timeoutHelper == null)
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+
+ writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
- int res = spi.readReceipt(sock, ackTimeout0);
+ int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
log.debug("Message has been sent to next node [msg=" + msg +
@@ -2262,11 +2262,19 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
e);
- if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
- ackTimeout0 *= 2;
+ if (timeoutHelper.checkFailureTimeoutReached(e))
+ break;
- if (!checkAckTimeout(ackTimeout0))
+ if (!spi.failureDetectionTimeoutEnabled()) {
+ if (++reconCnt == spi.getReconnectCount())
break;
+ else if (e instanceof SocketTimeoutException ||
+ X.hasCause(e, SocketTimeoutException.class)) {
+ ackTimeout0 *= 2;
+
+ if (!checkAckTimeout(ackTimeout0))
+ break;
+ }
}
}
finally {
@@ -2279,7 +2287,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Message has not been sent [next=" + next.id() + ", msg=" + msg +
- ", i=" + i + ']');
+ (!spi.failureDetectionTimeoutEnabled() ? ", i=" + reconCnt : "") + ']');
}
}
} // Try to reconnect.
@@ -3342,7 +3350,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (leftNode.equals(next) && sock != null) {
try {
- writeToSocket(sock, msg);
+ writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+ spi.failureDetectionTimeout() : spi.getSocketTimeout());
if (log.isDebugEnabled())
log.debug("Sent verified node left message to leaving node: " + msg);
@@ -3991,6 +4000,77 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
}
+
+ /**
+ * Sends heartbeat message if needed.
+ */
+ private void sendHeartbeatMessage() {
+ if (!isLocalNodeCoordinator())
+ return;
+
+ long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
+
+ if (elapsed > 0)
+ return;
+
+ TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
+
+ msg.verify(getLocalNodeId());
+
+ msgWorker.addMessage(msg);
+
+ lastTimeHbMsgSent = U.currentTimeMillis();
+ }
+
+ /**
+ * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
+ * {@link TcpDiscoveryStatusCheckMessage} is sent accros the ring.
+ */
+ private void checkHeartbeatsReceiving() {
+ if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
+ lastTimeStatusMsgSent = locNode.lastUpdateTime();
+
+ long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+
+ if (elapsed > 0)
+ return;
+
+ msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null));
+
+ lastTimeStatusMsgSent = U.currentTimeMillis();
+ }
+
+ /**
+ * Check connection aliveness status.
+ */
+ private void checkConnection() {
+ if (!spi.failureDetectionTimeoutEnabled())
+ return;
+
+ if (!failureThresholdReached && U.currentTimeMillis() - locNode.lastDataReceivedTime()
+ >= spi.failureDetectionTimeout() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
+
+ log.info("Local node seems to be disconnected from topology (failure detection timeout " +
+ "is reached): [failureDetectionTimeout=" + spi.failureDetectionTimeout() +
+ ", connCheckFreq=" + connCheckFreq + ']');
+
+ failureThresholdReached = true;
+
+ // Reset sent time deliberately to force sending connection check message.
+ lastTimeConnCheckMsgSent = 0;
+ }
+
+ long elapsed = (lastTimeConnCheckMsgSent + connCheckFreq) - U.currentTimeMillis();
+
+ if (elapsed > 0)
+ return;
+
+ if (ring.hasRemoteNodes()) {
+ sendMessageAcrossRing(new TcpDiscoveryConnectionCheckMessage(locNode));
+
+ lastTimeConnCheckMsgSent = U.currentTimeMillis();
+ }
+ }
}
/**
@@ -4186,14 +4266,17 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
+ IgniteSpiOperationTimeoutHelper timeoutHelper =
+ new IgniteSpiOperationTimeoutHelper(spi);
+
if (req.clientNodeId() != null) {
ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
if (clientWorker != null)
- res.clientExists(clientWorker.ping());
+ res.clientExists(clientWorker.ping(timeoutHelper));
}
- spi.writeToSocket(sock, res);
+ spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
}
else if (log.isDebugEnabled())
log.debug("Ignore ping request, node is stopping.");
@@ -4214,7 +4297,8 @@ class ServerImpl extends TcpDiscoveryImpl {
if (req.client())
res.clientAck(true);
- spi.writeToSocket(sock, res);
+ spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ?
+ spi.failureDetectionTimeout() : spi.getSocketTimeout());
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
// the local node sends a handshake request message on the loopback address, so we get here.
@@ -4323,6 +4407,9 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
+ long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ spi.getSocketTimeout();
+
while (!isInterrupted()) {
try {
TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
@@ -4337,7 +4424,12 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode && recordable(msg))
debugLog("Message has been received: " + msg);
- if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+ if (msg instanceof TcpDiscoveryConnectionCheckMessage) {
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+
+ continue;
+ }
+ else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
if (!req.responded()) {
@@ -4355,7 +4447,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoverySpiState state = spiStateCopy();
if (state == CONNECTED) {
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
if (clientMsgWrk.getState() == State.NEW)
clientMsgWrk.start();
@@ -4365,7 +4457,7 @@ class ServerImpl extends TcpDiscoveryImpl {
continue;
}
else {
- spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN);
+ spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, socketTimeout);
break;
}
@@ -4373,7 +4465,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
// Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
boolean ignored = false;
@@ -4402,7 +4494,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
// Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
boolean ignored = false;
@@ -4431,7 +4523,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
// Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
boolean ignored = false;
@@ -4460,7 +4552,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
// Send receipt back.
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
boolean ignored = false;
@@ -4509,7 +4601,7 @@ class ServerImpl extends TcpDiscoveryImpl {
clientMsgWrk.addMessage(ack);
}
else
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -4610,8 +4702,11 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoverySpiState state = spiStateCopy();
+ long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ spi.getSocketTimeout();
+
if (state == CONNECTED) {
- spi.writeToSocket(msg, sock, RES_OK);
+ spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -4648,7 +4743,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Local node is stopping. Remote node should try next one.
res = RES_CONTINUE_JOIN;
- spi.writeToSocket(msg, sock, res);
+ spi.writeToSocket(msg, sock, res, socketTimeout);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
@@ -4741,7 +4836,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param clientNodeId Node ID.
*/
protected ClientMessageWorker(Socket sock, UUID clientNodeId) {
- super("tcp-disco-client-message-worker");
+ super("tcp-disco-client-message-worker", 2000);
this.sock = sock;
this.clientNodeId = clientNodeId;
@@ -4791,7 +4886,8 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Sending message ack to client [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
- writeToSocket(sock, msg);
+ writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+ spi.failureDetectionTimeout() : spi.getSocketTimeout());
}
}
else {
@@ -4802,7 +4898,8 @@ class ServerImpl extends TcpDiscoveryImpl {
prepareNodeAddedMessage(msg, clientNodeId, null, null);
- writeToSocket(sock, msg);
+ writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
+ spi.failureDetectionTimeout() : spi.getSocketTimeout());
}
finally {
clearNodeAddedMessage(msg);
@@ -4836,10 +4933,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * @param timeoutHelper Timeout controller.
* @return Ping result.
* @throws InterruptedException If interrupted.
*/
- public boolean ping() throws InterruptedException {
+ public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException {
if (spi.isNodeStopping0())
return false;
@@ -4865,7 +4963,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
try {
- return fut.get(spi.ackTimeout, TimeUnit.MILLISECONDS);
+ return fut.get(timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()),
+ TimeUnit.MILLISECONDS);
}
catch (IgniteInterruptedCheckedException ignored) {
throw new InterruptedException();
@@ -4904,12 +5003,18 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Backed interrupted flag. */
private volatile boolean interrupted;
+ /** Polling timeout. */
+ private final long pollingTimeout;
+
/**
* @param name Thread name.
+ * @param pollingTimeout Messages polling timeout.
*/
- protected MessageWorkerAdapter(String name) {
+ protected MessageWorkerAdapter(String name, long pollingTimeout) {
super(spi.ignite().name(), name, log);
+ this.pollingTimeout = pollingTimeout;
+
setPriority(spi.threadPri);
}
@@ -4919,12 +5024,12 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']');
while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS);
+ TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
if (msg == null)
- continue;
-
- processMessage(msg);
+ noMessageLoop();
+ else
+ processMessage(msg);
}
}
@@ -4968,16 +5073,24 @@ class ServerImpl extends TcpDiscoveryImpl {
protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
/**
+ * Called when there is no message to process giving ability to perform other activity.
+ */
+ protected void noMessageLoop() {
+ // No-op.
+ }
+
+ /**
* @param sock Socket.
* @param msg Message.
+ * @param timeout Socket timeout.
* @throws IOException If IO failed.
* @throws IgniteCheckedException If marshalling failed.
*/
- protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+ protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException, IgniteCheckedException {
bout.reset();
- spi.writeToSocket(sock, msg, bout);
+ spi.writeToSocket(sock, msg, bout, timeout);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index c271b7c..4dacf45 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -131,6 +131,13 @@ abstract class TcpDiscoveryImpl {
}
/**
+ * Called when a chunk of data is received from a remote node.
+ */
+ protected void onDataReceived() {
+ // No-op
+ }
+
+ /**
* @param log Logger.
*/
public abstract void dumpDebugInfo(IgniteLogger log);
@@ -273,10 +280,10 @@ abstract class TcpDiscoveryImpl {
* maximum acknowledgement timeout, {@code false} otherwise.
*/
protected boolean checkAckTimeout(long ackTimeout) {
- if (ackTimeout > spi.maxAckTimeout) {
+ if (ackTimeout > spi.getMaxAckTimeout()) {
LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
"(consider increasing 'maxAckTimeout' configuration property) " +
- "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']');
+ "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.getMaxAckTimeout() + ']');
return false;
}