You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/04/26 13:26:00 UTC
[49/50] [abbrv] ignite git commit: ignite-4799 TcpDiscoverySpi:
removed missedHeartbeats properties,
heartbeatFrequency (instead use IgiteConfiguration.metricsUpdateFrequency).
Added IgiteConfiguration.clientFailureDetectionTimeout.
ignite-4799 TcpDiscoverySpi: removed missedHeartbeats properties, heartbeatFrequency (instead use IgiteConfiguration.metricsUpdateFrequency). Added IgiteConfiguration.clientFailureDetectionTimeout.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6998785a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6998785a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6998785a
Branch: refs/heads/master
Commit: 6998785a8861387b7ad83527a381dc5b772cf76f
Parents: c829aac
Author: Alexander Belyak <al...@xored.com>
Authored: Wed Apr 26 15:43:42 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 26 15:43:42 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/gridify/AbstractAopTest.java | 4 +-
.../gridify/ExternalNonSpringAopSelfTest.java | 6 +-
.../clients/src/test/resources/spring-cache.xml | 1 -
.../apache/ignite/cluster/ClusterMetrics.java | 4 +-
.../org/apache/ignite/cluster/ClusterNode.java | 11 +-
.../configuration/IgniteConfiguration.java | 52 ++-
.../org/apache/ignite/events/EventType.java | 2 +-
.../processors/job/GridJobProcessor.java | 18 +-
.../utils/PlatformConfigurationUtils.java | 32 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 32 ++
.../spi/IgniteSpiOperationTimeoutHelper.java | 8 +-
.../jobstealing/JobStealingCollisionSpi.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 6 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 31 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 142 ++++----
.../spi/discovery/tcp/TcpDiscoverySpi.java | 143 ++------
.../spi/discovery/tcp/TcpDiscoverySpiMBean.java | 26 +-
.../tcp/internal/TcpDiscoveryNode.java | 33 +-
.../TcpDiscoveryClientHeartbeatMessage.java | 72 ----
.../TcpDiscoveryClientMetricsUpdateMessage.java | 72 ++++
.../messages/TcpDiscoveryHeartbeatMessage.java | 338 -------------------
.../TcpDiscoveryMetricsUpdateMessage.java | 338 +++++++++++++++++++
.../adaptive/AdaptiveLoadBalancingSpi.java | 12 +-
.../resources/META-INF/classnames.properties | 8 +-
.../core/src/test/config/load/dsi-load-base.xml | 3 +-
.../src/test/config/load/merge-sort-base.xml | 7 +-
.../config/streamer/spring-streamer-base.xml | 5 +-
.../java/org/apache/ignite/GridTestJob.java | 19 ++
.../java/org/apache/ignite/GridTestTask.java | 18 +-
.../internal/ClusterNodeMetricsSelfTest.java | 10 +-
.../ignite/internal/GridAffinityMappedTest.java | 5 +-
.../internal/GridAffinityP2PSelfTest.java | 3 +-
.../ignite/internal/GridAffinitySelfTest.java | 3 +-
.../GridCancelledJobsMetricsSelfTest.java | 4 +-
...ridFailFastNodeFailureDetectionSelfTest.java | 4 +-
.../GridJobCollisionCancelSelfTest.java | 2 +-
.../GridDiscoveryManagerAliveCacheSelfTest.java | 4 +-
.../GridCacheAbstractFailoverSelfTest.java | 4 +-
.../cache/GridCacheAbstractSelfTest.java | 4 +-
.../cache/GridCacheMvccManagerSelfTest.java | 3 +-
.../cache/IgniteCacheAbstractTest.java | 10 +-
.../binary/BinaryMetadataUpdatesFlowTest.java | 4 +-
.../CacheLateAffinityAssignmentTest.java | 3 +-
.../GridCacheNodeFailureAbstractTest.java | 3 +-
.../distributed/IgniteCache150ClientsTest.java | 2 +-
.../IgniteCacheNearRestartRollbackSelfTest.java | 2 +-
...dCacheColocatedTxSingleThreadedSelfTest.java | 2 +-
.../dht/GridCacheDhtPreloadDelayedSelfTest.java | 2 +-
.../GridCacheDhtPreloadMessageCountTest.java | 2 +-
.../atomic/IgniteCacheAtomicProtocolTest.java | 2 +-
.../near/GridCacheNearMultiGetSelfTest.java | 2 +-
.../near/GridCacheNearMultiNodeSelfTest.java | 2 +-
...achePartitionedTxSingleThreadedSelfTest.java | 2 +-
.../cache/query/IndexingSpiQuerySelfTest.java | 4 +-
.../service/GridServiceClientNodeTest.java | 7 +-
...ridSingleSplitsNewNodesAbstractLoadTest.java | 11 +-
...idSingleSplitsNewNodesMulticastLoadTest.java | 9 +-
.../p2p/GridP2PSameClassLoaderSelfTest.java | 2 +-
.../discovery/AbstractDiscoverySelfTest.java | 19 +-
...lientDiscoverySpiFailureTimeoutSelfTest.java | 245 +++++++++++++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 79 ++++-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 18 +-
.../tcp/TcpDiscoverySpiConfigSelfTest.java | 4 +-
.../TcpDiscoverySpiFailureTimeoutSelfTest.java | 51 +--
.../testframework/junits/GridAbstractTest.java | 10 +-
.../webapp/META-INF/ignite-webapp-config.xml | 1 -
.../Cache/CacheMetricsTest.cs | 3 +-
.../IgniteConfigurationSerializerTest.cs | 1 -
.../IgniteConfigurationTest.cs | 1 -
.../Discovery/Tcp/TcpDiscoverySpi.cs | 15 -
.../IgniteConfigurationSection.xsd | 5 -
.../Datagrid/MultiTieredCacheExample.cs | 2 +-
.../ignite/p2p/GridP2PDisabledSelfTest.java | 4 +-
.../webapp2/META-INF/ignite-webapp-config.xml | 1 -
74 files changed, 1135 insertions(+), 886 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/aop/src/test/java/org/apache/ignite/gridify/AbstractAopTest.java
----------------------------------------------------------------------
diff --git a/modules/aop/src/test/java/org/apache/ignite/gridify/AbstractAopTest.java b/modules/aop/src/test/java/org/apache/ignite/gridify/AbstractAopTest.java
index 2008eff..33f2cdd 100644
--- a/modules/aop/src/test/java/org/apache/ignite/gridify/AbstractAopTest.java
+++ b/modules/aop/src/test/java/org/apache/ignite/gridify/AbstractAopTest.java
@@ -54,9 +54,9 @@ public abstract class AbstractAopTest extends GridCommonAbstractTest {
cfg.setDeploymentSpi(new LocalDeploymentSpi());
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(500);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+ cfg.setMetricsUpdateFrequency(500);
cfg.setDeploymentMode(depMode);
return cfg;
@@ -738,4 +738,4 @@ public abstract class AbstractAopTest extends GridCommonAbstractTest {
return true;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/aop/src/test/java/org/test/gridify/ExternalNonSpringAopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/aop/src/test/java/org/test/gridify/ExternalNonSpringAopSelfTest.java b/modules/aop/src/test/java/org/test/gridify/ExternalNonSpringAopSelfTest.java
index b53501b..44fa48d 100644
--- a/modules/aop/src/test/java/org/test/gridify/ExternalNonSpringAopSelfTest.java
+++ b/modules/aop/src/test/java/org/test/gridify/ExternalNonSpringAopSelfTest.java
@@ -524,9 +524,7 @@ public class ExternalNonSpringAopSelfTest extends GridCommonAbstractTest {
IgniteConfiguration cfg = super.getConfiguration();
cfg.setDeploymentSpi(new LocalDeploymentSpi());
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(500);
-
- cfg.setDeploymentMode(depMode);
+ cfg.setMetricsUpdateFrequency(500);
cfg.setDeploymentMode(depMode);
@@ -539,4 +537,4 @@ public class ExternalNonSpringAopSelfTest extends GridCommonAbstractTest {
@Override public String getTestIgniteInstanceName() {
return "ExternalAopTarget";
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/clients/src/test/resources/spring-cache.xml
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/resources/spring-cache.xml b/modules/clients/src/test/resources/spring-cache.xml
index 4dbae6e..8cbc688 100644
--- a/modules/clients/src/test/resources/spring-cache.xml
+++ b/modules/clients/src/test/resources/spring-cache.xml
@@ -148,7 +148,6 @@
<property name="bucketName" value="YOUR_BUCKET_NAME_IP_FINDER"/>
</bean>
</property>
- <property name="heartbeatFrequency" value="2000"/>
</bean>
</property>
-->
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterMetrics.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterMetrics.java
index 50c09be..7dd4707 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterMetrics.java
@@ -29,8 +29,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
* <p>
* Node metrics for any node can be accessed via {@link ClusterNode#metrics()}
* method. Keep in mind that there will be a certain network delay (usually
- * equal to heartbeat delay) for the accuracy of node metrics. However, when accessing
- * metrics on local node {@link IgniteCluster#localNode() Grid.localNode().getMetrics()}
+ * equal to metrics update delay) for the accuracy of node metrics. However, when accessing
+ * metrics on local node {@link IgniteCluster#localNode() IgniteCluster.localNode().getMetrics()}
* the metrics are always accurate and up to date.
* <p>
* Local node metrics are registered as {@code MBean} and can be accessed from
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index bfc395d..e122ff6 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -87,9 +87,8 @@ import org.jetbrains.annotations.Nullable;
* <h1 class="header">Cluster Node Metrics</h1>
* Cluster node metrics (see {@link #metrics()}) are updated frequently for all nodes
* and can be used to get dynamic information about a node. The frequency of update
- * is often directly related to the heartbeat exchange between nodes. So if, for example,
- * default {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} is used,
- * the metrics data will be updated every {@code 2} seconds by default.
+ * is controlled by {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsUpdateFrequency()} parameter.
+ * The metrics data will be updated every {@code 2} seconds by default.
* <p>
* Grid node metrics provide information that can frequently change,
* such as Heap and Non-Heap memory utilization, CPU load, number of active and waiting
@@ -145,9 +144,9 @@ public interface ClusterNode {
* method and use it during {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} or during collision
* resolution.
* <p>
- * Node metrics are updated with some delay which is directly related to heartbeat
- * frequency. For example, when used with default
- * {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} the update will happen every {@code 2} seconds.
+ * Node metrics are updated with some delay which is controlled by
+ * {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsUpdateFrequency()} parameter.
+ * By default the update will happen every {@code 2} seconds.
*
* @return Runtime metrics snapshot for this node.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 17927b9..9f68399 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -200,6 +200,10 @@ public class IgniteConfiguration {
@SuppressWarnings("UnnecessaryBoxing")
public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000);
+ /** Default failure detection timeout for client nodes in millis. */
+ @SuppressWarnings("UnnecessaryBoxing")
+ public static final Long DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT = new Long(30_000);
+
/** Optional local Ignite instance name. */
private String igniteInstanceName;
@@ -386,6 +390,9 @@ public class IgniteConfiguration {
/** Failure detection timeout. */
private Long failureDetectionTimeout = DFLT_FAILURE_DETECTION_TIMEOUT;
+ /** Failure detection timeout for client nodes. */
+ private Long clientFailureDetectionTimeout = DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT;
+
/** Property names to include into node attributes. */
private String[] includeProps;
@@ -491,6 +498,7 @@ public class IgniteConfiguration {
cacheSanityCheckEnabled = cfg.isCacheSanityCheckEnabled();
callbackPoolSize = cfg.getAsyncCallbackPoolSize();
classLdr = cfg.getClassLoader();
+ clientFailureDetectionTimeout = cfg.getClientFailureDetectionTimeout();
clientMode = cfg.isClientMode();
connectorCfg = cfg.getConnectorConfiguration();
consistentId = cfg.getConsistentId();
@@ -1288,20 +1296,13 @@ public class IgniteConfiguration {
}
/**
- * Gets job metrics update frequency in milliseconds.
+ * Gets Ignite metrics update frequency in milliseconds.
* <p>
* Updating metrics too frequently may have negative performance impact.
* <p>
- * The following values are accepted:
- * <ul>
- * <li>{@code -1} job metrics are never updated.</li>
- * <li>{@code 0} job metrics are updated on each job start and finish.</li>
- * <li>Positive value defines the actual update frequency. If not provided, then default value
- * {@link #DFLT_METRICS_UPDATE_FREQ} is used.</li>
- * </ul>
* If not provided, then default value {@link #DFLT_METRICS_UPDATE_FREQ} is used.
*
- * @return Job metrics update frequency in milliseconds.
+ * @return Metrics update frequency in milliseconds.
* @see #DFLT_METRICS_UPDATE_FREQ
*/
public long getMetricsUpdateFrequency() {
@@ -1309,15 +1310,13 @@ public class IgniteConfiguration {
}
/**
- * Sets job metrics update frequency in milliseconds.
+ * Sets Ignite metrics update frequency in milliseconds.
* <p>
- * If set to {@code -1} job metrics are never updated.
- * If set to {@code 0} job metrics are updated on each job start and finish.
* Positive value defines the actual update frequency.
* If not provided, then default value
* {@link #DFLT_METRICS_UPDATE_FREQ} is used.
*
- * @param metricsUpdateFreq Job metrics update frequency in milliseconds.
+ * @param metricsUpdateFreq Metrics update frequency in milliseconds.
* @return {@code this} for chaining.
*/
public IgniteConfiguration setMetricsUpdateFrequency(long metricsUpdateFreq) {
@@ -1835,6 +1834,33 @@ public class IgniteConfiguration {
}
/**
+ * Returns failure detection timeout for client nodes used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}.
+ * <p>
+ * Default is {@link #DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT}.
+ *
+ * @see #setClientFailureDetectionTimeout(long)
+ * @return Failure detection timeout for client nodes in milliseconds.
+ */
+ public Long getClientFailureDetectionTimeout() {
+ return clientFailureDetectionTimeout;
+ }
+
+ /**
+ * Sets failure detection timeout to use in {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}.
+ * <p>
+ * Failure detection timeout is used to determine how long the communication or discovery SPIs should wait before
+ * considering a remote connection failed.
+ *
+ * @param clientFailureDetectionTimeout Failure detection timeout in milliseconds.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setClientFailureDetectionTimeout(long clientFailureDetectionTimeout) {
+ this.clientFailureDetectionTimeout = clientFailureDetectionTimeout;
+
+ return this;
+ }
+
+ /**
* Returns failure detection timeout used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}.
* <p>
* Default is {@link #DFLT_FAILURE_DETECTION_TIMEOUT}.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index e506371..1960692 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -136,7 +136,7 @@ public interface EventType {
* Built-in event type: node metrics updated.
* <br>
* Generated when node's metrics are updated. In most cases this callback
- * is invoked with every heartbeat received from a node (including local node).
+ * is invoked with every metrics update received from a node (including local node).
* <p>
* NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
* internal Ignite events and should not be used by user-defined events.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 369ca22..e0bc4d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -855,8 +855,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
}
});
- if (metricsUpdateFreq > -1L)
- updateJobMetrics();
+ updateJobMetrics();
}
finally {
handlingCollision.set(Boolean.FALSE);
@@ -867,24 +866,21 @@ public class GridJobProcessor extends GridProcessorAdapter {
*
*/
private void updateJobMetrics() {
- assert metricsUpdateFreq > -1L;
+ assert metricsUpdateFreq > 0L;
- if (metricsUpdateFreq == 0L)
+ long now = U.currentTimeMillis();
+ long lastUpdate = metricsLastUpdateTstamp.get();
+
+ if (now - lastUpdate > metricsUpdateFreq && metricsLastUpdateTstamp.compareAndSet(lastUpdate, now))
updateJobMetrics0();
- else {
- long now = U.currentTimeMillis();
- long lastUpdate = metricsLastUpdateTstamp.get();
- if (now - lastUpdate > metricsUpdateFreq && metricsLastUpdateTstamp.compareAndSet(lastUpdate, now))
- updateJobMetrics0();
- }
}
/**
*
*/
private void updateJobMetrics0() {
- assert metricsUpdateFreq > -1L;
+ assert metricsUpdateFreq > 0L;
GridJobMetricsSnapshot m = new GridJobMetricsSnapshot();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 4186eb9..eb3e716 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -531,7 +531,8 @@ public class PlatformConfigurationUtils {
if (in.readBoolean())
cfg.setClientMode(in.readBoolean());
int[] eventTypes = in.readIntArray();
- if (eventTypes != null) cfg.setIncludeEventTypes(eventTypes);
+ if (eventTypes != null)
+ cfg.setIncludeEventTypes(eventTypes);
if (in.readBoolean())
cfg.setMetricsExpireTime(in.readLong());
if (in.readBoolean())
@@ -556,8 +557,10 @@ public class PlatformConfigurationUtils {
cfg.setDaemon(in.readBoolean());
if (in.readBoolean())
cfg.setLateAffinityAssignment(in.readBoolean());
- if (in.readBoolean())
+ if (in.readBoolean()) {
+ cfg.setClientFailureDetectionTimeout(in.readLong());
cfg.setFailureDetectionTimeout(in.readLong());
+ }
readCacheConfigurations(in, cfg);
readDiscoveryConfiguration(in, cfg);
@@ -752,12 +755,9 @@ public class PlatformConfigurationUtils {
disco.setReconnectCount(in.readInt());
disco.setLocalPort(in.readInt());
disco.setLocalPortRange(in.readInt());
- disco.setMaxMissedHeartbeats(in.readInt());
- disco.setMaxMissedClientHeartbeats(in.readInt());
disco.setStatisticsPrintFrequency(in.readLong());
disco.setIpFinderCleanFrequency(in.readLong());
disco.setThreadPriority(in.readInt());
- disco.setHeartbeatFrequency(in.readLong());
disco.setTopHistorySize(in.readInt());
cfg.setDiscoverySpi(disco);
@@ -960,7 +960,8 @@ public class PlatformConfigurationUtils {
w.writeLong(cfg.getMetricsUpdateFrequency());
w.writeBoolean(true);
w.writeInt(cfg.getNetworkSendRetryCount());
- w.writeBoolean(true);w.writeLong(cfg.getNetworkSendRetryDelay());
+ w.writeBoolean(true);
+ w.writeLong(cfg.getNetworkSendRetryDelay());
w.writeBoolean(true);
w.writeLong(cfg.getNetworkTimeout());
w.writeString(cfg.getWorkDirectory());
@@ -970,6 +971,7 @@ public class PlatformConfigurationUtils {
w.writeBoolean(true);
w.writeBoolean(cfg.isLateAffinityAssignment());
w.writeBoolean(true);
+ w.writeLong(cfg.getClientFailureDetectionTimeout());
w.writeLong(cfg.getFailureDetectionTimeout());
CacheConfiguration[] cacheCfg = cfg.getCacheConfiguration();
@@ -1063,17 +1065,17 @@ public class PlatformConfigurationUtils {
else
w.writeBoolean(false);
- EventStorageSpi eventStorageSpi = cfg.getEventStorageSpi();
+ EventStorageSpi evtStorageSpi = cfg.getEventStorageSpi();
- if (eventStorageSpi == null) {
+ if (evtStorageSpi == null)
w.writeByte((byte) 0);
- } else if (eventStorageSpi instanceof NoopEventStorageSpi) {
+ else if (evtStorageSpi instanceof NoopEventStorageSpi)
w.writeByte((byte) 1);
- } else if (eventStorageSpi instanceof MemoryEventStorageSpi) {
+ else if (evtStorageSpi instanceof MemoryEventStorageSpi) {
w.writeByte((byte) 2);
- w.writeLong(((MemoryEventStorageSpi)eventStorageSpi).getExpireCount());
- w.writeLong(((MemoryEventStorageSpi)eventStorageSpi).getExpireAgeMs());
+ w.writeLong(((MemoryEventStorageSpi)evtStorageSpi).getExpireCount());
+ w.writeLong(((MemoryEventStorageSpi)evtStorageSpi).getExpireAgeMs());
}
writeMemoryConfiguration(w, cfg.getMemoryConfiguration());
@@ -1135,9 +1137,8 @@ public class PlatformConfigurationUtils {
w.writeInt(ttl);
}
}
- else {
+ else
w.writeBoolean(false);
- }
w.writeLong(tcp.getSocketTimeout());
w.writeLong(tcp.getAckTimeout());
@@ -1151,12 +1152,9 @@ public class PlatformConfigurationUtils {
w.writeInt(tcp.getReconnectCount());
w.writeInt(tcp.getLocalPort());
w.writeInt(tcp.getLocalPortRange());
- w.writeInt(tcp.getMaxMissedHeartbeats());
- w.writeInt(tcp.getMaxMissedClientHeartbeats());
w.writeLong(tcp.getStatisticsPrintFrequency());
w.writeLong(tcp.getIpFinderCleanFrequency());
w.writeInt(tcp.getThreadPriority());
- w.writeLong(tcp.getHeartbeatFrequency());
w.writeInt((int)tcp.getTopHistorySize());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index ec56f4f..81f5c28 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -95,6 +95,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
private boolean failureDetectionTimeoutEnabled = true;
/**
+ * Failure detection timeout for client nodes. Initialized with the value of
+ * {@link IgniteConfiguration#getClientFailureDetectionTimeout()}.
+ */
+ private long clientFailureDetectionTimeout;
+
+ /**
* Failure detection timeout. Initialized with the value of
* {@link IgniteConfiguration#getFailureDetectionTimeout()}.
*/
@@ -648,12 +654,29 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
// Because U.currentTimeInMillis() is updated once in 10 milliseconds.
log.warning("Failure detection timeout is too low, it may lead to unpredictable behaviour " +
"[failureDetectionTimeout=" + failureDetectionTimeout + ']');
+ else if (failureDetectionTimeout <= ignite.configuration().getMetricsUpdateFrequency())
+ log.warning("'IgniteConfiguration.failureDetectionTimeout' should be greater then " +
+ "'IgniteConfiguration.metricsUpdateFrequency' to prevent unnecessary status checking.");
}
// Intentionally compare references using '!=' below
else if (ignite.configuration().getFailureDetectionTimeout() !=
IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT)
log.warning("Failure detection timeout will be ignored (one of SPI parameters has been set explicitly)");
+ clientFailureDetectionTimeout = ignite.configuration().getClientFailureDetectionTimeout();
+
+ if (clientFailureDetectionTimeout <= 0)
+ throw new IgniteSpiException("Invalid client failure detection timeout value: " +
+ clientFailureDetectionTimeout);
+ else if (clientFailureDetectionTimeout <= 10)
+ // Because U.currentTimeInMillis() is updated once in 10 milliseconds.
+ log.warning("Client failure detection timeout is too low, it may lead to unpredictable behaviour " +
+ "[clientFailureDetectionTimeout=" + clientFailureDetectionTimeout + ']');
+
+ if (clientFailureDetectionTimeout < ignite.configuration().getMetricsUpdateFrequency())
+ throw new IgniteSpiException("Inconsistent configuration " +
+ "('IgniteConfiguration.clientFailureDetectionTimeout' must be greater or equal to " +
+ "'IgniteConfiguration.metricsUpdateFrequency').");
}
/**
@@ -675,6 +698,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
}
/**
+ * Returns client failure detection timeout set to use for network related operations.
+ *
+ * @return client failure detection timeout in milliseconds or {@code 0} if the timeout is disabled.
+ */
+ public long clientFailureDetectionTimeout() {
+ return clientFailureDetectionTimeout;
+ }
+
+ /**
* Returns failure detection timeout set to use for network related operations.
*
* @return failure detection timeout in milliseconds or {@code 0} if the timeout is disabled.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index e17b0dd..c685ea9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -45,10 +45,12 @@ public class IgniteSpiOperationTimeoutHelper {
* Constructor.
*
* @param adapter SPI adapter.
+ * @param srvOp {@code True} if communicates with server node.
*/
- public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) {
+ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp) {
failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled();
- failureDetectionTimeout = adapter.failureDetectionTimeout();
+ failureDetectionTimeout = srvOp ? adapter.failureDetectionTimeout() :
+ adapter.clientFailureDetectionTimeout();
}
/**
@@ -99,4 +101,4 @@ public class IgniteSpiOperationTimeoutHelper {
return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException ||
X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
index 8a02225..6f2c099 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java
@@ -88,7 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
* {@link org.apache.ignite.spi.failover.jobstealing.JobStealingFailoverSpi JobStealingFailoverSpi}.
* Also note that job metrics update should be enabled in order for this SPI
* to work properly (i.e. {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsUpdateFrequency() IgniteConfiguration#getMetricsUpdateFrequency()}
- * should be set to {@code 0} or greater value).
+ * should be set to positive value).
* The responsibility of Job Stealing Failover SPI is to properly route <b>stolen</b>
* jobs to the nodes that initially requested (<b>stole</b>) these jobs. The
* SPI maintains a counter of how many times a jobs was stolen and
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1fedf83..be897d6 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2727,7 +2727,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
long connTimeout0 = connTimeout;
- IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this,
+ !node.isClient());
while (true) {
GridCommunicationClient client;
@@ -2918,7 +2919,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
int attempt = 1;
- IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this,
+ !node.isClient());
while (!conn) { // Reconnection on handshake timeout.
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 34ee414..b5b4c77 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -81,7 +81,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientAckResponse;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientHeartbeatMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
@@ -89,7 +89,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessa
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
@@ -272,9 +272,9 @@ class ClientImpl extends TcpDiscoveryImpl {
}
timer.schedule(
- new HeartbeatSender(),
- spi.hbFreq,
- spi.hbFreq);
+ new MetricsSender(),
+ spi.metricsUpdateFreq,
+ spi.metricsUpdateFreq);
spi.printStartInfo();
}
@@ -597,7 +597,7 @@ class ClientImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
- IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
while (true) {
boolean openSock = false;
@@ -861,13 +861,14 @@ class ClientImpl extends TcpDiscoveryImpl {
}
/**
- * Heartbeat sender.
+ * Metrics sender.
*/
- private class HeartbeatSender extends TimerTask {
+ private class MetricsSender extends TimerTask {
/** {@inheritDoc} */
@Override public void run() {
if (!spi.getSpiContext().isStopping() && sockWriter.isOnline()) {
- TcpDiscoveryClientHeartbeatMessage msg = new TcpDiscoveryClientHeartbeatMessage(getLocalNodeId(),
+ TcpDiscoveryClientMetricsUpdateMessage msg = new TcpDiscoveryClientMetricsUpdateMessage(
+ getLocalNodeId(),
spi.metricsProvider.metrics());
msg.client(true);
@@ -1829,8 +1830,8 @@ class ClientImpl extends TcpDiscoveryImpl {
processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg);
else if (msg instanceof TcpDiscoveryNodeFailedMessage)
processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
- else if (msg instanceof TcpDiscoveryHeartbeatMessage)
- processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+ else if (msg instanceof TcpDiscoveryMetricsUpdateMessage)
+ processMetricsUpdateMessage((TcpDiscoveryMetricsUpdateMessage)msg);
else if (msg instanceof TcpDiscoveryClientReconnectMessage)
processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
else if (msg instanceof TcpDiscoveryCustomEventMessage)
@@ -2152,7 +2153,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/**
* @param msg Message.
*/
- private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
+ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
if (spi.getSpiContext().isStopping())
return;
@@ -2160,16 +2161,16 @@ class ClientImpl extends TcpDiscoveryImpl {
assert msg.senderNodeId() != null;
if (log.isDebugEnabled())
- log.debug("Received heartbeat response: " + msg);
+ log.debug("Received metrics response: " + msg);
}
else {
long tstamp = U.currentTimeMillis();
if (msg.hasMetrics()) {
- for (Map.Entry<UUID, TcpDiscoveryHeartbeatMessage.MetricsSet> e : msg.metrics().entrySet()) {
+ for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();
- TcpDiscoveryHeartbeatMessage.MetricsSet metricsSet = e.getValue();
+ TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();
Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 47c13e1..6a10ec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -106,7 +106,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientAckResponse;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientHeartbeatMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
@@ -116,7 +116,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
@@ -612,7 +612,8 @@ class ServerImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
- IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi,
+ clientNodeId == null);
if (F.contains(spi.locNodeAddrs, addr)) {
if (clientNodeId == null)
@@ -991,7 +992,9 @@ class ServerImpl extends TcpDiscoveryImpl {
for (InetSocketAddress addr : addrs) {
try {
- Integer res = sendMessageDirectly(joinReq, addr);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
+
+ Integer res = sendMessageDirectly(joinReq, addr, timeoutHelper);
assert res != null;
@@ -1104,10 +1107,12 @@ class ServerImpl extends TcpDiscoveryImpl {
*
* @param msg Message to send.
* @param addr Address to send message to.
+ * @param timeoutHelper Operation timeout helper.
* @return Response read from the recipient or {@code null} if no response is supposed.
* @throws IgniteSpiException If an error occurs.
*/
- @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr)
+ @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr,
+ IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IgniteSpiException {
assert msg != null;
assert addr != null;
@@ -1124,8 +1129,6 @@ class ServerImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
- IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
-
int reconCnt = 0;
while (true){
@@ -1731,7 +1734,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* @return {@code True} if recordable in debug mode.
*/
private boolean recordable(TcpDiscoveryAbstractMessage msg) {
- return !(msg instanceof TcpDiscoveryHeartbeatMessage) &&
+ return !(msg instanceof TcpDiscoveryMetricsUpdateMessage) &&
!(msg instanceof TcpDiscoveryStatusCheckMessage) &&
!(msg instanceof TcpDiscoveryDiscardMessage) &&
!(msg instanceof TcpDiscoveryConnectionCheckMessage);
@@ -1759,7 +1762,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message.
* @param nodeId Node ID.
*/
- private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
+ private static void removeMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) {
msg.removeMetrics(nodeId);
msg.removeCacheMetrics(nodeId);
}
@@ -2384,11 +2387,11 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Last time status message has been sent. */
private long lastTimeStatusMsgSent;
- /** Incoming heartbeats check frequency. */
- private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50;
+ /** Incoming metrics check frequency. */
+ private long metricsCheckFreq = 3 * spi.metricsUpdateFreq + 50;
- /** Last time heartbeat message has been sent. */
- private long lastTimeHbMsgSent;
+ /** Last time metrics update message has been sent. */
+ private long lastTimeMetricsUpdateMsgSent;
/** Time when the last status message has been sent. */
private long lastTimeConnCheckMsgSent;
@@ -2483,7 +2486,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (spi.failureDetectionTimeoutEnabled())
connCheckThreshold = spi.failureDetectionTimeout();
else
- connCheckThreshold = Math.min(spi.getSocketTimeout(), spi.getHeartbeatFrequency());
+ connCheckThreshold = Math.min(spi.getSocketTimeout(), spi.metricsUpdateFreq);
for (int i = 3; i > 0; i--) {
connCheckFreq = connCheckThreshold / i;
@@ -2502,7 +2505,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to process.
*/
@Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
- sendHeartbeatMessage();
+ sendMetricsUpdateMessage();
DebugLogger log = messageLogger(msg);
@@ -2555,8 +2558,8 @@ class ServerImpl extends TcpDiscoveryImpl {
else if (msg instanceof TcpDiscoveryNodeFailedMessage)
processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg);
- else if (msg instanceof TcpDiscoveryHeartbeatMessage)
- processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg);
+ else if (msg instanceof TcpDiscoveryMetricsUpdateMessage)
+ processMetricsUpdateMessage((TcpDiscoveryMetricsUpdateMessage)msg);
else if (msg instanceof TcpDiscoveryStatusCheckMessage)
processStatusCheckMessage((TcpDiscoveryStatusCheckMessage)msg);
@@ -2594,9 +2597,9 @@ class ServerImpl extends TcpDiscoveryImpl {
checkConnection();
- sendHeartbeatMessage();
+ sendMetricsUpdateMessage();
- checkHeartbeatsReceiving();
+ checkMetricsReceiving();
checkPendingCustomMessages();
@@ -2750,7 +2753,7 @@ class ServerImpl extends TcpDiscoveryImpl {
while (true) {
if (sock == null) {
if (timeoutHelper == null)
- timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
nextNodeExists = false;
@@ -2918,7 +2921,7 @@ class ServerImpl extends TcpDiscoveryImpl {
pendingMsgs.discardId, pendingMsgs.customDiscardId);
if (timeoutHelper == null)
- timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
try {
spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk(
@@ -2958,7 +2961,7 @@ class ServerImpl extends TcpDiscoveryImpl {
long tstamp = U.currentTimeMillis();
if (timeoutHelper == null)
- timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
if (!failedNodes.isEmpty()) {
for (TcpDiscoveryNode failedNode : failedNodes) {
@@ -3817,7 +3820,9 @@ class ServerImpl extends TcpDiscoveryImpl {
for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) {
try {
- sendMessageDirectly(msg, addr);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
+
+ sendMessageDirectly(msg, addr, timeoutHelper);
node.lastSuccessfulAddress(addr);
@@ -3853,7 +3858,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (node != null) {
node.clientRouterNodeId(msg.routerNodeId());
- node.aliveCheck(spi.maxMissedClientHbs);
+ node.clientAliveTime(spi.clientFailureDetectionTimeout());
}
if (isLocalNodeCoordinator()) {
@@ -4083,7 +4088,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.client())
- node.aliveCheck(spi.maxMissedClientHbs);
+ node.clientAliveTime(spi.clientFailureDetectionTimeout());
boolean topChanged = ring.add(node);
@@ -4830,7 +4835,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null &&
- U.currentTimeMillis() - locNode.lastUpdateTime() < spi.hbFreq) {
+ U.currentTimeMillis() - locNode.lastUpdateTime() < spi.metricsUpdateFreq) {
if (log.isDebugEnabled())
log.debug("Status check message discarded (local node receives updates).");
@@ -4873,11 +4878,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Processes regular heartbeat message.
+ * Processes regular metrics update message.
*
- * @param msg Heartbeat message.
+ * @param msg Metrics update message.
*/
- private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) {
+ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
assert msg != null;
assert !msg.client();
@@ -4886,7 +4891,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (ring.node(msg.creatorNodeId()) == null) {
if (log.isDebugEnabled())
- log.debug("Discarding heartbeat message issued by unknown node [msg=" + msg +
+ log.debug("Discarding metrics update message issued by unknown node [msg=" + msg +
", ring=" + ring + ']');
return;
@@ -4894,14 +4899,14 @@ class ServerImpl extends TcpDiscoveryImpl {
if (isLocalNodeCoordinator() && !locNodeId.equals(msg.creatorNodeId())) {
if (log.isDebugEnabled())
- log.debug("Discarding heartbeat message issued by non-coordinator node: " + msg);
+ log.debug("Discarding metrics update message issued by non-coordinator node: " + msg);
return;
}
if (!isLocalNodeCoordinator() && locNodeId.equals(msg.creatorNodeId())) {
if (log.isDebugEnabled())
- log.debug("Discarding heartbeat message issued by local node (node is no more coordinator): " +
+ log.debug("Discarding metrics update message issued by local node (node is no more coordinator): " +
msg);
return;
@@ -4909,7 +4914,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId) && msg.senderNodeId() != null) {
if (log.isTraceEnabled())
- log.trace("Discarding heartbeat message that has made two passes: " + msg);
+ log.trace("Discarding metrics update message that has made two passes: " + msg);
return;
}
@@ -4918,10 +4923,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (spiStateCopy() == CONNECTED) {
if (msg.hasMetrics()) {
- for (Map.Entry<UUID, TcpDiscoveryHeartbeatMessage.MetricsSet> e : msg.metrics().entrySet()) {
+ for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();
- TcpDiscoveryHeartbeatMessage.MetricsSet metricsSet = e.getValue();
+ TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();
Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
@@ -4960,11 +4965,11 @@ class ServerImpl extends TcpDiscoveryImpl {
for (TcpDiscoveryNode clientNode : ring.clientNodes()) {
if (clientNode.visible()) {
if (clientNodeIds.contains(clientNode.id()))
- clientNode.aliveCheck(spi.maxMissedClientHbs);
+ clientNode.clientAliveTime(spi.clientFailureDetectionTimeout());
else {
- int aliveCheck = clientNode.decrementAliveCheck();
+ boolean aliveCheck = clientNode.isClientAlive();
- if (aliveCheck <= 0 && isLocalNodeCoordinator()) {
+ if (!aliveCheck && isLocalNodeCoordinator()) {
boolean failedNode;
synchronized (mux) {
@@ -4972,6 +4977,12 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (!failedNode) {
+ U.warn(log, "Failing client node due to not receiving metrics updates " +
+ "from client node within " +
+ "'IgniteConfiguration.clientFailureDetectionTimeout' " +
+ "(consider increasing configuration property) " +
+ "[timeout=" + spi.clientFailureDetectionTimeout() + ", node=" + clientNode + ']');
+
TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(
locNodeId, clientNode.id(), clientNode.internalOrder());
@@ -5027,7 +5038,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* @param msg Message.
*/
- private boolean hasMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
+ private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) {
return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
}
@@ -5338,34 +5349,34 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Sends heartbeat message if needed.
+ * Sends metrics update message if needed.
*/
- private void sendHeartbeatMessage() {
- long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
+ private void sendMetricsUpdateMessage() {
+ long elapsed = (lastTimeMetricsUpdateMsgSent + spi.metricsUpdateFreq) - U.currentTimeMillis();
if (elapsed > 0 || !isLocalNodeCoordinator())
return;
- TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
+ TcpDiscoveryMetricsUpdateMessage msg = new TcpDiscoveryMetricsUpdateMessage(getConfiguredNodeId());
msg.verify(getLocalNodeId());
msgWorker.addMessage(msg);
- lastTimeHbMsgSent = U.currentTimeMillis();
+ lastTimeMetricsUpdateMsgSent = U.currentTimeMillis();
}
/**
- * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than
- * {@link TcpDiscoveryStatusCheckMessage} is sent across the ring.
+ * Checks the last time a metrics update message received. If the time is bigger than {@code metricsCheckFreq}
+ * than {@link TcpDiscoveryStatusCheckMessage} is sent across the ring.
*/
- private void checkHeartbeatsReceiving() {
+ private void checkMetricsReceiving() {
if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
lastTimeStatusMsgSent = locNode.lastUpdateTime();
long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime);
- long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis();
+ long elapsed = (updateTime + metricsCheckFreq) - U.currentTimeMillis();
if (elapsed > 0)
return;
@@ -5548,6 +5559,8 @@ class ServerImpl extends TcpDiscoveryImpl {
ClientMessageWorker clientMsgWrk = null;
+ boolean srvSock;
+
try {
InputStream in;
@@ -5618,7 +5631,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
IgniteSpiOperationTimeoutHelper timeoutHelper =
- new IgniteSpiOperationTimeoutHelper(spi);
+ new IgniteSpiOperationTimeoutHelper(spi, true);
if (req.clientNodeId() != null) {
ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
@@ -5638,6 +5651,8 @@ class ServerImpl extends TcpDiscoveryImpl {
// Handshake.
TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
+ srvSock = !req.client();
+
UUID nodeId = req.creatorNodeId();
this.nodeId = nodeId;
@@ -5648,8 +5663,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (req.client())
res.clientAck(true);
- spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ?
- spi.failureDetectionTimeout() : spi.getSocketTimeout());
+ spi.writeToSocket(sock, res, spi.getEffectiveSocketTimeout(srvSock));
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
// the local node sends a handshake request message on the loopback address, so we get here.
@@ -5764,8 +5778,7 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
- spi.getSocketTimeout();
+ long sockTimeout = spi.getEffectiveSocketTimeout(srvSock);
while (!isInterrupted()) {
try {
@@ -5950,10 +5963,10 @@ class ServerImpl extends TcpDiscoveryImpl {
continue;
}
- TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null;
+ TcpDiscoveryClientMetricsUpdateMessage metricsUpdateMsg = null;
- if (msg instanceof TcpDiscoveryClientHeartbeatMessage)
- heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg;
+ if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage)
+ metricsUpdateMsg = (TcpDiscoveryClientMetricsUpdateMessage)msg;
else
msgWorker.addMessage(msg);
@@ -5968,8 +5981,8 @@ class ServerImpl extends TcpDiscoveryImpl {
else
spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
- if (heartbeatMsg != null)
- processClientHeartbeatMessage(heartbeatMsg);
+ if (metricsUpdateMsg != null)
+ processClientMetricsUpdateMessage(metricsUpdateMsg);
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -6037,11 +6050,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * Processes client heartbeat message.
+ * Processes client metrics update message.
*
- * @param msg Heartbeat message.
+ * @param msg Client metrics update message.
*/
- private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg) {
+ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMessage msg) {
assert msg.client();
ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId());
@@ -6049,7 +6062,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (wrk != null)
wrk.metrics(msg.metrics());
else if (log.isDebugEnabled())
- log.debug("Received heartbeat message from unknown client node: " + msg);
+ log.debug("Received client metrics update message from unknown client node: " + msg);
}
/**
@@ -6286,7 +6299,7 @@ class ServerImpl extends TcpDiscoveryImpl {
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
- spi.failureDetectionTimeout() : spi.getSocketTimeout());
+ spi.clientFailureDetectionTimeout() : spi.getSocketTimeout());
}
}
else {
@@ -6296,8 +6309,7 @@ class ServerImpl extends TcpDiscoveryImpl {
assert topologyInitialized(msg) : msg;
- spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ?
- spi.failureDetectionTimeout() : spi.getSocketTimeout());
+ spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false));
}
boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage &&
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 25804c7..46d6f06 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -138,7 +138,6 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean;
* configuration parameters may be used. As an example, for stable low-latency networks the following more aggressive
* settings are recommended (which allows failure detection time ~200ms):
* <ul>
- * <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)}) - 100ms</li>
* <li>Socket timeout (see {@link #setSocketTimeout(long)}) - 200ms</li>
* <li>Message acknowledgement timeout (see {@link #setAckTimeout(long)}) - 50ms</li>
* </ul>
@@ -166,8 +165,6 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean;
* <li>Local port to bind to (see {@link #setLocalPort(int)})</li>
* <li>Local port range to try binding to if previous ports are in use
* (see {@link #setLocalPortRange(int)})</li>
- * <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)})</li>
- * <li>Max missed heartbeats (see {@link #setMaxMissedHeartbeats(int)})</li>
* <li>Number of times node tries to (re)establish connection to another node
* (see {@link #setReconnectCount(int)})</li>
* <li>Network timeout (see {@link #setNetworkTimeout(long)})</li>
@@ -241,8 +238,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
/** Default value for thread priority (value is <tt>10</tt>). */
public static final int DFLT_THREAD_PRI = 10;
- /** Default heartbeat messages issuing frequency (value is <tt>2000ms</tt>). */
- public static final long DFLT_HEARTBEAT_FREQ = 2000;
+ /**
+ * Default metrics update messages issuing frequency
+ * (value is {@link IgniteConfiguration#DFLT_METRICS_UPDATE_FREQ}).
+ */
+ public static final long DFLT_METRICS_UPDATE_FREQ = IgniteConfiguration.DFLT_METRICS_UPDATE_FREQ;
/** Default size of topology snapshots history. */
public static final int DFLT_TOP_HISTORY_SIZE = 1000;
@@ -262,12 +262,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
/** Default reconnect attempts count (value is <tt>10</tt>). */
public static final int DFLT_RECONNECT_CNT = 10;
- /** Default max heartbeats count node can miss without initiating status check (value is <tt>1</tt>). */
- public static final int DFLT_MAX_MISSED_HEARTBEATS = 1;
-
- /** Default max heartbeats count node can miss without failing client node (value is <tt>5</tt>). */
- public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5;
-
/** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */
public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000;
@@ -302,8 +296,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
/** Thread priority for all threads started by SPI. */
protected int threadPri = DFLT_THREAD_PRI;
- /** Heartbeat messages issuing frequency. */
- protected long hbFreq = DFLT_HEARTBEAT_FREQ;
+ /** Metrics update messages issuing frequency. */
+ protected long metricsUpdateFreq = DFLT_METRICS_UPDATE_FREQ;
/** Size of topology snapshots history. */
protected int topHistSize = DFLT_TOP_HISTORY_SIZE;
@@ -361,12 +355,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
/** Maximum message acknowledgement timeout. */
private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
- /** Max heartbeats count node can miss without initiating status check. */
- protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
-
- /** Max heartbeats count node can miss without failing client node. */
- protected int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
-
/** IP finder clean frequency. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
protected long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ;
@@ -731,56 +719,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
}
/**
- * Gets max heartbeats count node can miss without initiating status check.
- *
- * @return Max missed heartbeats.
- */
- public int getMaxMissedHeartbeats() {
- return maxMissedHbs;
- }
-
- /**
- * Sets max heartbeats count node can miss without initiating status check.
- * <p>
- * If not provided, default value is {@link #DFLT_MAX_MISSED_HEARTBEATS}.
- * <p>
- * Affected server nodes only.
- *
- * @param maxMissedHbs Max missed heartbeats.
- * @return {@code this} for chaining.
- */
- @IgniteSpiConfiguration(optional = true)
- public TcpDiscoverySpi setMaxMissedHeartbeats(int maxMissedHbs) {
- this.maxMissedHbs = maxMissedHbs;
-
- return this;
- }
-
- /**
- * Gets max heartbeats count node can miss without failing client node.
- *
- * @return Max missed client heartbeats.
- */
- public int getMaxMissedClientHeartbeats() {
- return maxMissedClientHbs;
- }
-
- /**
- * Sets max heartbeats count node can miss without failing client node.
- * <p>
- * If not provided, default value is {@link #DFLT_MAX_MISSED_CLIENT_HEARTBEATS}.
- *
- * @param maxMissedClientHbs Max missed client heartbeats.
- * @return {@code this} for chaining.
- */
- @IgniteSpiConfiguration(optional = true)
- public TcpDiscoverySpi setMaxMissedClientHeartbeats(int maxMissedClientHbs) {
- this.maxMissedClientHbs = maxMissedClientHbs;
-
- return this;
- }
-
- /**
* Gets statistics print frequency.
*
* @return Statistics print frequency in milliseconds.
@@ -966,22 +904,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
}
/**
- * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages
- * in configurable time interval to other nodes to notify them about its state.
- * <p>
- * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}.
- *
- * @param hbFreq Heartbeat frequency in milliseconds.
- * @return {@code this} for chaining.
- */
- @IgniteSpiConfiguration(optional = true)
- public TcpDiscoverySpi setHeartbeatFrequency(long hbFreq) {
- this.hbFreq = hbFreq;
-
- return this;
- }
-
- /**
* @return Size of topology snapshots history.
*/
public long getTopHistorySize() {
@@ -1180,6 +1102,20 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
}
/**
+ * Gets effective or resulting socket timeout with considering failure detection timeout
+ *
+ * @param srvrOperation {@code True} if socket connect to server node,
+ * {@code False} if socket connect to client node.
+ * @return Resulting socket timeout.
+ */
+ public long getEffectiveSocketTimeout(boolean srvrOperation) {
+ if (failureDetectionTimeoutEnabled())
+ return srvrOperation ? failureDetectionTimeout() : clientFailureDetectionTimeout();
+ else
+ return sockTimeout;
+ }
+
+ /**
* Gets message acknowledgement timeout.
*
* @return Message acknowledgement timeout.
@@ -1207,19 +1143,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
}
/**
- * Gets delay between heartbeat messages sent by coordinator.
- *
- * @return Time period in milliseconds.
- */
- public long getHeartbeatFrequency() {
- return hbFreq;
- }
-
- /**
* Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation).
*
* @return IPFinder (string representation).
- */public String getIpFinderFormatted() {
+ */
+ public String getIpFinderFormatted() {
return ipFinder.toString();
}
@@ -1939,6 +1867,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
impl = new ServerImpl(this);
}
+ metricsUpdateFreq = ignite.configuration().getMetricsUpdateFrequency();
+
if (!failureDetectionTimeoutEnabled()) {
assertParameter(sockTimeout > 0, "sockTimeout > 0");
assertParameter(ackTimeout > 0, "ackTimeout > 0");
@@ -1948,14 +1878,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
assertParameter(netTimeout > 0, "networkTimeout > 0");
assertParameter(ipFinder != null, "ipFinder != null");
- assertParameter(hbFreq > 0, "heartbeatFreq > 0");
+ assertParameter(metricsUpdateFreq > 0, "metricsUpdateFreq > 0" +
+ " (inited from igniteConfiguration.metricsUpdateFrequency)");
assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0");
assertParameter(locPort > 1023, "localPort > 1023");
assertParameter(locPortRange >= 0, "localPortRange >= 0");
assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff");
- assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0");
- assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0");
assertParameter(threadPri > 0, "threadPri > 0");
assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0");
@@ -2000,8 +1929,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
log.debug(configInfo("ipFinder", ipFinder));
log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq));
- log.debug(configInfo("heartbeatFreq", hbFreq));
- log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs));
+ log.debug(configInfo("metricsUpdateFreq", metricsUpdateFreq));
log.debug(configInfo("statsPrintFreq", statsPrintFreq));
}
@@ -2336,21 +2264,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
}
/** {@inheritDoc} */
- @Override public long getHeartbeatFrequency() {
- return TcpDiscoverySpi.this.getHeartbeatFrequency();
- }
-
- /** {@inheritDoc} */
- @Override public int getMaxMissedHeartbeats() {
- return TcpDiscoverySpi.this.getMaxMissedHeartbeats();
- }
-
- /** {@inheritDoc} */
- @Override public int getMaxMissedClientHeartbeats() {
- return TcpDiscoverySpi.this.getMaxMissedClientHeartbeats();
- }
-
- /** {@inheritDoc} */
@Override public long getStatisticsPrintFrequency() {
return TcpDiscoverySpi.this.getStatisticsPrintFrequency();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
index 1427929..a05ecde 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java
@@ -28,14 +28,6 @@ import org.jetbrains.annotations.Nullable;
*/
public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean {
/**
- * Gets delay between heartbeat messages sent by coordinator.
- *
- * @return Time period in milliseconds.
- */
- @MXBeanDescription("Heartbeat frequency.")
- public long getHeartbeatFrequency();
-
- /**
* Gets current SPI state.
*
* @return Current SPI state.
@@ -84,22 +76,6 @@ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean {
public int getLocalPortRange();
/**
- * Gets max heartbeats count node can miss without initiating status check.
- *
- * @return Max missed heartbeats.
- */
- @MXBeanDescription("Max missed heartbeats.")
- public int getMaxMissedHeartbeats();
-
- /**
- * Gets max heartbeats count node can miss without failing client node.
- *
- * @return Max missed client heartbeats.
- */
- @MXBeanDescription("Max missed client heartbeats.")
- public int getMaxMissedClientHeartbeats();
-
- /**
* Gets thread priority. All threads within SPI will be started with it.
*
* @return Thread priority.
@@ -281,4 +257,4 @@ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean {
*/
@MXBeanDescription("Client mode.")
public boolean isClientMode() throws IllegalStateException;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index d778854..6882821 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -102,7 +102,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
/** Node order in the topology (internal). */
private volatile long intOrder;
- /** The most recent time when heartbeat message was received from the node. */
+ /** The most recent time when metrics update message was received from the node. */
@GridToStringExclude
private volatile long lastUpdateTime = U.currentTimeMillis();
@@ -123,9 +123,9 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
/** Version. */
private IgniteProductVersion ver;
- /** Alive check (used by clients). */
+ /** Alive check time (used by clients). */
@GridToStringExclude
- private transient int aliveCheck;
+ private transient long aliveCheckTime;
/** Client router node ID. */
@GridToStringExclude
@@ -291,9 +291,8 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
* Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated
* and provide up to date information about caches.
* <p>
- * Cache metrics are updated with some delay which is directly related to heartbeat
- * frequency. For example, when used with default
- * {@link TcpDiscoverySpi} the update will happen every {@code 2} seconds.
+ * Cache metrics are updated with some delay which is directly related to metrics update
+ * frequency. For example, by default the update will happen every {@code 2} seconds.
*
* @return Runtime metrics snapshots for this node.
*/
@@ -414,7 +413,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
/**
* Gets node last update time.
*
- * @return Time of the last heartbeat.
+ * @return Time of the last metrics update.
*/
public long lastUpdateTime() {
return lastUpdateTime;
@@ -473,23 +472,25 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
}
/**
- * Decrements alive check value and returns new one.
+ * Test alive check time value.
*
- * @return Alive check value.
+ * @return {@code True} if client alive, {@code False} otherwise.
*/
- public int decrementAliveCheck() {
- assert isClient();
+ public boolean isClientAlive() {
+ assert isClient() : this;
- return --aliveCheck;
+ return (aliveCheckTime - U.currentTimeMillis()) >= 0;
}
/**
- * @param aliveCheck Alive check value.
+ * Set client alive time.
+ *
+ * @param aliveTime Alive time interval.
*/
- public void aliveCheck(int aliveCheck) {
- assert isClient();
+ public void clientAliveTime(long aliveTime) {
+ assert isClient() : this;
- this.aliveCheck = aliveCheck;
+ this.aliveCheckTime = U.currentTimeMillis() + aliveTime;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
deleted file mode 100644
index ade5468..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp.messages;
-
-import java.util.UUID;
-import org.apache.ignite.cluster.ClusterMetrics;
-import org.apache.ignite.internal.ClusterMetricsSnapshot;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Heartbeat message.
- * <p>
- * Client sends his heartbeats in this message.
- */
-public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final byte[] metrics;
-
- /**
- * Constructor.
- *
- * @param creatorNodeId Creator node.
- * @param metrics Metrics.
- */
- public TcpDiscoveryClientHeartbeatMessage(UUID creatorNodeId, ClusterMetrics metrics) {
- super(creatorNodeId);
-
- this.metrics = ClusterMetricsSnapshot.serialize(metrics);
- }
-
- /**
- * Gets metrics map.
- *
- * @return Metrics map.
- */
- public ClusterMetrics metrics() {
- return ClusterMetricsSnapshot.deserialize(metrics, 0);
- }
-
- /** {@inheritDoc} */
- @Override public boolean highPriority() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean traceLogLevel() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TcpDiscoveryClientHeartbeatMessage.class, this, "super", super.toString());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
new file mode 100644
index 0000000..b56cd01
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Metrics update message.
+ * <p>
+ * Client sends his metrics in this message.
+ */
+public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final byte[] metrics;
+
+ /**
+ * Constructor.
+ *
+ * @param creatorNodeId Creator node.
+ * @param metrics Metrics.
+ */
+ public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, ClusterMetrics metrics) {
+ super(creatorNodeId);
+
+ this.metrics = ClusterMetricsSnapshot.serialize(metrics);
+ }
+
+ /**
+ * Gets metrics map.
+ *
+ * @return Metrics map.
+ */
+ public ClusterMetrics metrics() {
+ return ClusterMetricsSnapshot.deserialize(metrics, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean highPriority() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean traceLogLevel() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryClientMetricsUpdateMessage.class, this, "super", super.toString());
+ }
+}
\ No newline at end of file