You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/22 07:54:59 UTC
incubator-ignite git commit: ignite-752: fixed the rest of review
notes
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-752 c453ab8dc -> c399a828f
ignite-752: fixed the rest of review notes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c399a828
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c399a828
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c399a828
Branch: refs/heads/ignite-752
Commit: c399a828f51644dc0991a4d73b195dc900a32ec5
Parents: c453ab8
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Jul 22 08:54:48 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jul 22 08:54:48 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 7 +-
.../IgniteSpiOperationTimeoutController.java | 102 --------------
.../spi/IgniteSpiOperationTimeoutException.java | 2 +-
.../spi/IgniteSpiOperationTimeoutHelper.java | 102 ++++++++++++++
.../communication/tcp/TcpCommunicationSpi.java | 20 +--
.../ignite/spi/discovery/tcp/ClientImpl.java | 12 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 132 +++++++++----------
.../spi/discovery/tcp/TcpDiscoverySpi.java | 8 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 4 +-
...TcpDiscoverySpiFailureThresholdSelfTest.java | 10 +-
10 files changed, 202 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 33a250d..5e8f061 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -599,7 +599,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
if (failureDetectionThresholdEnabled) {
failureDetectionThreshold = ignite.configuration().getFailureDetectionThreshold();
- assertParameter(failureDetectionThreshold > 0, "failureDetectionThreshold > 0");
+ if (failureDetectionThreshold <= 0)
+ throw new IgniteSpiException("Invalid failure detection threshold value: " + failureDetectionThreshold);
+ else if (failureDetectionThreshold <= 10)
+ // Because U.currentTimeInMillis() is updated once in 10 milliseconds.
+ log.warning("Failure detection threshold is too low, it may lead to unpredictable behaviour " +
+ "[failureDetectionThreshold=" + failureDetectionThreshold + ']');
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
deleted file mode 100644
index 6213893..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.spi;
-
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.net.*;
-
-/**
- * Object that incorporates logic that determines a timeout value for the next network related operation and checks
- * whether a failure detection threshold is reached or not.
- *
- * A new instance of the class should be created for every complex network based operations that usually consists of
- * request and response parts.
- */
-public class IgniteSpiOperationTimeoutController {
- /** */
- private long lastOperStartTs;
-
- /** */
- private long timeout;
-
- /** */
- private final boolean failureDetectionThresholdEnabled;
-
- /** */
- private final long failureDetectionThreshold;
-
- /**
- * Constructor.
- *
- * @param adapter SPI adapter.
- */
- public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) {
- failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled();
- failureDetectionThreshold = adapter.failureDetectionThreshold();
- }
-
- /**
- * Returns a timeout value to use for the next network operation.
- *
- * If failure detection threshold is enabled then the returned value is a portion of time left since the last time
- * this method is called. If the threshold is disabled then {@code dfltTimeout} is returned.
- *
- * @param dfltTimeout Timeout to use if failure detection threshold is disabled.
- * @return Timeout in milliseconds.
- * @throws IgniteSpiOperationTimeoutException If failure detection threshold is reached for an operation that uses
- * this {@code IgniteSpiOperationTimeoutController}.
- */
- public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException {
- if (!failureDetectionThresholdEnabled)
- return dfltTimeout;
-
- if (lastOperStartTs == 0) {
- timeout = failureDetectionThreshold;
- lastOperStartTs = U.currentTimeMillis();
- }
- else {
- long curTs = U.currentTimeMillis();
-
- timeout = timeout - (curTs - lastOperStartTs);
-
- lastOperStartTs = curTs;
-
- if (timeout <= 0)
- throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
- "'failureDetectionThreshold' configuration property or set SPI specific timeouts" +
- " manually. Current failure detection threshold: " + failureDetectionThreshold);
- }
-
- return timeout;
- }
-
- /**
- * Checks whether the given {@link Exception} is generated because failure detection threshold has been reached.
- *
- * @param e Exception.
- * @return {@code true} if failure detection threshold is reached, {@code false} otherwise.
- */
- public boolean checkFailureDetectionThresholdReached(Exception e) {
- if (!failureDetectionThresholdEnabled)
- return false;
-
- return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException ||
- X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
index 1ea05fd..235fd2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
@@ -27,7 +27,7 @@ import org.apache.ignite.configuration.*;
* {@link TcpCommunicationSpi}.
*
* For more information refer to {@link IgniteConfiguration#setFailureDetectionThreshold(long)} and
- * {@link IgniteSpiOperationTimeoutController}.
+ * {@link IgniteSpiOperationTimeoutHelper}.
*/
public class IgniteSpiOperationTimeoutException extends IgniteCheckedException {
/** */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
new file mode 100644
index 0000000..03858d9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.net.*;
+
+/**
+ * Object that incorporates logic that determines a timeout value for the next network related operation and checks
+ * whether a failure detection threshold is reached or not.
+ *
+ * A new instance of the class should be created for every complex network based operations that usually consists of
+ * request and response parts.
+ */
+public class IgniteSpiOperationTimeoutHelper {
+ /** */
+ private long lastOperStartTs;
+
+ /** */
+ private long timeout;
+
+ /** */
+ private final boolean failureDetectionThresholdEnabled;
+
+ /** */
+ private final long failureDetectionThreshold;
+
+ /**
+ * Constructor.
+ *
+ * @param adapter SPI adapter.
+ */
+ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) {
+ failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled();
+ failureDetectionThreshold = adapter.failureDetectionThreshold();
+ }
+
+ /**
+ * Returns a timeout value to use for the next network operation.
+ *
+ * If failure detection threshold is enabled then the returned value is a portion of time left since the last time
+ * this method is called. If the threshold is disabled then {@code dfltTimeout} is returned.
+ *
+ * @param dfltTimeout Timeout to use if failure detection threshold is disabled.
+ * @return Timeout in milliseconds.
+ * @throws IgniteSpiOperationTimeoutException If failure detection threshold is reached for an operation that uses
+ * this {@code IgniteSpiOperationTimeoutController}.
+ */
+ public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException {
+ if (!failureDetectionThresholdEnabled)
+ return dfltTimeout;
+
+ if (lastOperStartTs == 0) {
+ timeout = failureDetectionThreshold;
+ lastOperStartTs = U.currentTimeMillis();
+ }
+ else {
+ long curTs = U.currentTimeMillis();
+
+ timeout = timeout - (curTs - lastOperStartTs);
+
+ lastOperStartTs = curTs;
+
+ if (timeout <= 0)
+ throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase " +
+ "'failureDetectionThreshold' configuration property or set SPI specific timeouts" +
+ " manually. Current failure detection threshold: " + failureDetectionThreshold);
+ }
+
+ return timeout;
+ }
+
+ /**
+ * Checks whether the given {@link Exception} is generated because failure detection threshold has been reached.
+ *
+ * @param e Exception.
+ * @return {@code true} if failure detection threshold is reached, {@code false} otherwise.
+ */
+ public boolean checkThresholdReached(Exception e) {
+ if (!failureDetectionThresholdEnabled)
+ return false;
+
+ return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException ||
+ X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1f09f05..b24d424 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1941,7 +1941,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long connTimeout0 = connTimeout;
- IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
while (true) {
GridCommunicationClient client;
@@ -1949,12 +1949,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
try {
client = new GridShmemCommunicationClient(metricsLsnr,
port,
- timeoutCtrl.nextTimeoutChunk(connTimeout),
+ timeoutHelper.nextTimeoutChunk(connTimeout),
log,
getSpiContext().messageFormatter());
}
catch (IgniteCheckedException e) {
- if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ if (timeoutHelper.checkThresholdReached(e))
throw e;
// Reconnect for the second time, if connection is not established.
@@ -1968,13 +1968,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
try {
- safeHandshake(client, null, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
+ safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
}
catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
client.forceClose();
if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException ||
- timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+ timeoutHelper.checkThresholdReached(e))) {
log.debug("Handshake timed out (failure threshold reached) [failureDetectionThreshold=" +
failureDetectionThreshold() + ", err=" + e.getMessage() + ", client=" + client + ']');
@@ -2100,7 +2100,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
int attempt = 1;
- IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(this);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this);
while (!conn) { // Reconnection on handshake timeout.
try {
@@ -2128,9 +2128,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
long rcvCnt = -1;
try {
- ch.socket().connect(addr, (int)timeoutCtrl.nextTimeoutChunk(connTimeout));
+ ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout));
- rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutCtrl.nextTimeoutChunk(connTimeout0));
+ rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
if (rcvCnt == -1)
return null;
@@ -2172,7 +2172,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
if (failureDetectionThresholdEnabled() && (e instanceof HandshakeTimeoutException ||
- timeoutCtrl.checkFailureDetectionThresholdReached(e))) {
+ timeoutHelper.checkThresholdReached(e))) {
String msg = "Handshake timed out (failure detection threshold is reached) " +
"[failureDetectionThreshold=" + failureDetectionThreshold() + ", addr=" + addr + ']';
@@ -2240,7 +2240,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
- boolean failureDetThrReached = timeoutCtrl.checkFailureDetectionThresholdReached(e);
+ boolean failureDetThrReached = timeoutHelper.checkThresholdReached(e);
if (failureDetThrReached)
LT.warn(log, null, "Connect timed out (consider increasing 'failureDetectionThreshold' " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 196c1b3..e0d1741 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -488,7 +488,7 @@ class ClientImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
- IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
while (true) {
boolean openSock = false;
@@ -498,7 +498,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr, timeoutCtrl);
+ sock = spi.openSocket(addr, timeoutHelper);
openSock = true;
@@ -506,7 +506,7 @@ class ClientImpl extends TcpDiscoveryImpl {
req.client(true);
- spi.writeToSocket(sock, req, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
@@ -536,7 +536,7 @@ class ClientImpl extends TcpDiscoveryImpl {
msg.client(true);
- spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
@@ -544,7 +544,7 @@ class ClientImpl extends TcpDiscoveryImpl {
log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
", rmtNodeId=" + rmtNodeId + ']');
- return new T3<>(sock, spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0)), res.clientAck());
+ return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), res.clientAck());
}
catch (IOException | IgniteCheckedException e) {
U.closeQuiet(sock);
@@ -559,7 +559,7 @@ class ClientImpl extends TcpDiscoveryImpl {
errs.add(e);
- if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ if (timeoutHelper.checkThresholdReached(e))
break;
if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2a09c62..1f98ba8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -462,7 +462,7 @@ class ServerImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
- IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
if (F.contains(spi.locNodeAddrs, addr)) {
if (clientNodeId == null)
@@ -476,7 +476,7 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean clientPingRes;
try {
- clientPingRes = clientWorker.ping(timeoutCtrl);
+ clientPingRes = clientWorker.ping(timeoutHelper);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -508,12 +508,12 @@ class ServerImpl extends TcpDiscoveryImpl {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr, timeoutCtrl);
+ sock = spi.openSocket(addr, timeoutHelper);
spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
- timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+ timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
- TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(
+ TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
spi.getAckTimeout()));
if (locNodeId.equals(res.creatorNodeId())) {
@@ -537,7 +537,7 @@ class ServerImpl extends TcpDiscoveryImpl {
errs.add(e);
- if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ if (timeoutHelper.checkThresholdReached(e))
break;
else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
break;
@@ -600,14 +600,8 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void onDataReceived() {
- if (spi.failureDetectionThresholdEnabled()) {
- if (locNode != null)
- locNode.lastDataReceivedTime(U.currentTimeMillis());
-
- if (msgWorker != null)
- // Node receives messages from remote nodes, reset this flag.
- msgWorker.failureDetectionNotified = false;
- }
+ if (spi.failureDetectionThresholdEnabled() && locNode != null)
+ locNode.lastDataReceivedTime(U.currentTimeMillis());
}
/**
@@ -894,7 +888,7 @@ class ServerImpl extends TcpDiscoveryImpl {
UUID locNodeId = getLocalNodeId();
- IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
int reconCnt = 0;
@@ -910,15 +904,15 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr, timeoutCtrl);
+ sock = spi.openSocket(addr, timeoutHelper);
openSock = true;
// Handshake.
- spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutCtrl.nextTimeoutChunk(
+ spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(
spi.getSocketTimeout()));
- TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
ackTimeout0));
if (locNodeId.equals(res.creatorNodeId())) {
@@ -933,7 +927,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Send message.
tstamp = U.currentTimeMillis();
- spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
@@ -950,7 +944,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// E.g. due to class not found issue.
joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
- return spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
+ return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
}
catch (ClassCastException e) {
// This issue is rarely reproducible on AmazonEC2, but never
@@ -976,7 +970,7 @@ class ServerImpl extends TcpDiscoveryImpl {
errs.add(e);
- if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ if (timeoutHelper.checkThresholdReached(e))
break;
if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
@@ -1774,8 +1768,8 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Time when the last status message has been sent. */
private long lastTimeConnCheckMsgSent;
- /** Whether an error message has been printed out when failure detection threshold is reached. */
- private volatile boolean failureDetectionNotified;
+ /** Flag that keeps info on whether the threshold is reached or not. */
+ private boolean failureThresholdReached;
/** Last time hearbeat message has been sent. */
private long lastTimeHbMsgSent;
@@ -1783,7 +1777,7 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
*/
protected RingMessageWorker() {
- super("tcp-disco-msg-worker");
+ super("tcp-disco-msg-worker", 10);
}
/**
@@ -1843,6 +1837,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (spi.ensured(msg))
msgHist.add(msg);
+ if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId()))
+ // Reset the flag.
+ failureThresholdReached = false;
+
spi.stats.onMessageProcessingFinished(msg);
}
@@ -1921,16 +1919,11 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode)
debugLog("No next node in topology.");
- if (ring.hasRemoteNodes()) {
+ /*if (ring.hasRemoteNodes()) {
msg.senderNodeId(locNodeId);
- if (msg instanceof TcpDiscoveryConnectionCheckMessage ||
- (msg instanceof TcpDiscoveryStatusCheckMessage &&
- ((TcpDiscoveryStatusCheckMessage)msg).replacedConnCheckMsg()))
- break;
-
addMessage(msg);
- }
+ }*/
break;
}
@@ -1975,12 +1968,12 @@ class ServerImpl extends TcpDiscoveryImpl {
int reconCnt = 0;
- IgniteSpiOperationTimeoutController timeoutCtrl = null;
+ IgniteSpiOperationTimeoutHelper timeoutHelper = null;
while (true) {
if (sock == null) {
- if (timeoutCtrl == null)
- timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+ if (timeoutHelper == null)
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
nextNodeExists = false;
@@ -1992,16 +1985,16 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr, timeoutCtrl);
+ sock = spi.openSocket(addr, timeoutHelper);
openSock = true;
// Handshake.
writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
- timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+ timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
- timeoutCtrl.nextTimeoutChunk(ackTimeout0));
+ timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -2088,7 +2081,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
break;
- if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ if (timeoutHelper.checkThresholdReached(e))
break;
else if (!spi.failureDetectionThresholdEnabled() && (e instanceof
SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
@@ -2111,7 +2104,7 @@ class ServerImpl extends TcpDiscoveryImpl {
nextNodeExists = true;
// Resetting timeout control object to let the code below to use a new one
// for the next bunch of operations.
- timeoutCtrl = null;
+ timeoutHelper = null;
}
}
}
@@ -2149,11 +2142,11 @@ class ServerImpl extends TcpDiscoveryImpl {
prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
pendingMsgs.discardId);
- if (timeoutCtrl == null)
- timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+ if (timeoutHelper == null)
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
try {
- writeToSocket(sock, pendingMsg, timeoutCtrl.nextTimeoutChunk(
+ writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk(
spi.getSocketTimeout()));
}
finally {
@@ -2162,7 +2155,7 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
- int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
+ int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
log.debug("Pending message has been sent to next node [msg=" + msg.id() +
@@ -2176,7 +2169,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Resetting timeout control object to create a new one for the next bunch of
// operations.
- timeoutCtrl = null;
+ timeoutHelper = null;
}
}
@@ -2185,14 +2178,14 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- if (timeoutCtrl == null)
- timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+ if (timeoutHelper == null)
+ timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
- writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+ writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
- int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
+ int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
log.debug("Message has been sent to next node [msg=" + msg +
@@ -2227,7 +2220,7 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']',
e);
- if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ if (timeoutHelper.checkThresholdReached(e))
break;
if (!spi.failureDetectionThresholdEnabled()) {
@@ -4045,14 +4038,17 @@ class ServerImpl extends TcpDiscoveryImpl {
* Check connection aliveness status.
*/
private void checkConnection() {
- if (!failureDetectionNotified && U.currentTimeMillis() - locNode.lastDataReceivedTime()
+ if (!spi.failureDetectionThresholdEnabled())
+ return;
+
+ if (!failureThresholdReached && U.currentTimeMillis() - locNode.lastDataReceivedTime()
>= spi.failureDetectionThreshold() && ring.hasRemoteNodes() && spiStateCopy() == CONNECTED) {
log.info("Local node seems to be disconnected from topology (failure detection threshold " +
"is reached): [failureDetectionThreshold=" + spi.failureDetectionThreshold() +
", connCheckFreq=" + spi.connCheckFreq + ']');
- failureDetectionNotified = true;
+ failureThresholdReached = true;
}
long elapsed = (lastTimeConnCheckMsgSent + spi.connCheckFreq) - U.currentTimeMillis();
@@ -4261,17 +4257,17 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId);
- IgniteSpiOperationTimeoutController timeoutCtrl =
- new IgniteSpiOperationTimeoutController(spi);
+ IgniteSpiOperationTimeoutHelper timeoutHelper =
+ new IgniteSpiOperationTimeoutHelper(spi);
if (req.clientNodeId() != null) {
ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId());
if (clientWorker != null)
- res.clientExists(clientWorker.ping(timeoutCtrl));
+ res.clientExists(clientWorker.ping(timeoutHelper));
}
- spi.writeToSocket(sock, res, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
}
else if (log.isDebugEnabled())
log.debug("Ignore ping request, node is stopping.");
@@ -4831,7 +4827,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param clientNodeId Node ID.
*/
protected ClientMessageWorker(Socket sock, UUID clientNodeId) {
- super("tcp-disco-client-message-worker");
+ super("tcp-disco-client-message-worker", 2000);
this.sock = sock;
this.clientNodeId = clientNodeId;
@@ -4928,11 +4924,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * @param timeoutCtrl Timeout controller.
+ * @param timeoutHelper Timeout controller.
* @return Ping result.
* @throws InterruptedException If interrupted.
*/
- public boolean ping(IgniteSpiOperationTimeoutController timeoutCtrl) throws InterruptedException {
+ public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws InterruptedException {
if (spi.isNodeStopping0())
return false;
@@ -4958,7 +4954,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
try {
- return fut.get(timeoutCtrl.nextTimeoutChunk(spi.getAckTimeout()),
+ return fut.get(timeoutHelper.nextTimeoutChunk(spi.getAckTimeout()),
TimeUnit.MILLISECONDS);
}
catch (IgniteInterruptedCheckedException ignored) {
@@ -4998,12 +4994,18 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Backed interrupted flag. */
private volatile boolean interrupted;
+ /** Polling timeout. */
+ private final long pollingTimeout;
+
/**
* @param name Thread name.
+ * @param pollingTimeout Messages polling timeout.
*/
- protected MessageWorkerAdapter(String name) {
+ protected MessageWorkerAdapter(String name, long pollingTimeout) {
super(spi.ignite().name(), name, log);
+ this.pollingTimeout = pollingTimeout;
+
setPriority(spi.threadPri);
}
@@ -5013,14 +5015,12 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']');
while (!isInterrupted()) {
- TcpDiscoveryAbstractMessage msg = queue.poll(10, TimeUnit.MILLISECONDS);
+ TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
- if (msg == null) {
+ if (msg == null)
noMessageLoop();
- continue;
- }
-
- processMessage(msg);
+ else
+ processMessage(msg);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 87848d4..588ff98 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1178,11 +1178,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/**
* @param sockAddr Remote address.
- * @param timeoutCtrl Timeout controller.
+ * @param timeoutHelper Timeout helper.
* @return Opened socket.
* @throws IOException If failed.
*/
- protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutController timeoutCtrl)
+ protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IOException, IgniteSpiOperationTimeoutException {
assert sockAddr != null;
@@ -1199,9 +1199,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
sock.setTcpNoDelay(true);
- sock.connect(resolved, (int)timeoutCtrl.nextTimeoutChunk(sockTimeout));
+ sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
- writeToSocket(sock, U.IGNITE_HEADER, timeoutCtrl.nextTimeoutChunk(sockTimeout));
+ writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
return sock;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 994b7b5..a50b060 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -2123,10 +2123,10 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected Socket openSocket(InetSocketAddress sockAddr,
- IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException {
+ IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
waitFor(openSockLock);
- return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutController(this));
+ return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutHelper(this));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c399a828/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
index 362be15..1ee839c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
@@ -308,15 +308,15 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
/** {@inheritDoc} */
- @Override protected Socket openSocket(InetSocketAddress sockAddr,
- IgniteSpiOperationTimeoutController timeoutCtrl) throws IOException, IgniteSpiOperationTimeoutException {
+ @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
+ throws IOException, IgniteSpiOperationTimeoutException {
if (openSocketTimeout) {
err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout");
throw err;
}
else if (openSocketTimeoutWait) {
- long timeout = timeoutCtrl.nextTimeoutChunk(0);
+ long timeout = timeoutHelper.nextTimeoutChunk(0);
try {
Thread.sleep(timeout + 1000);
@@ -326,14 +326,14 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
}
try {
- timeoutCtrl.nextTimeoutChunk(0);
+ timeoutHelper.nextTimeoutChunk(0);
}
catch (IgniteSpiOperationTimeoutException e) {
throw (err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait"));
}
}
- Socket sock = super.openSocket(sockAddr, timeoutCtrl);
+ Socket sock = super.openSocket(sockAddr, timeoutHelper);
try {
Thread.sleep(1500);