You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/15 16:12:04 UTC
[3/3] incubator-ignite git commit: ignite-752: keep reusing failure
detection threshold
ignite-752: keep reusing failure detection threshold
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1b6005a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1b6005a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1b6005a5
Branch: refs/heads/ignite-752
Commit: 1b6005a50584045cc11633541bc7b023ee399b32
Parents: 567aec1
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Jul 15 17:11:45 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jul 15 17:11:45 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 61 -----------
.../IgniteSpiOperationTimeoutController.java | 93 ++++++++++++++++
.../spi/IgniteSpiOperationTimeoutException.java | 33 ++++++
.../ignite/spi/discovery/tcp/ServerImpl.java | 106 +++++++++++--------
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 16 ++-
6 files changed, 196 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 82ed3d0..422ce81 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -31,12 +31,10 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.resources.*;
-
import org.jetbrains.annotations.*;
import javax.management.*;
import java.io.*;
-import java.net.*;
import java.text.*;
import java.util.*;
@@ -581,51 +579,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/**
* TODO: IGNITE-752
- * @param dfltTimeout
- * @return
- */
- public long firstNetOperationTimeout(long dfltTimeout) {
- return !failureDetectionThresholdEnabled ? dfltTimeout : failureDetectionThreshold;
- }
-
- /**
- * TODO: IGNITE-752
- * @param curTimeout
- * @param lastOperStartTime
- * @param dfltTimeout
- * @return
- * @throws IOException
- */
- public long nextNetOperationTimeout(long curTimeout, long lastOperStartTime, long dfltTimeout)
- throws NetOperationTimeoutException {
- if (!failureDetectionThresholdEnabled)
- return dfltTimeout;
-
- long timeLeft = curTimeout - lastOperStartTime;
-
- if (timeLeft <= 0)
- throw new NetOperationTimeoutException("Network operation timed out. Increase failure detection threshold" +
- " using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts manually." +
- " Current failure detection threshold: " + failureDetectionThreshold);
-
- return timeLeft;
- }
-
- /**
- * TODO: IGNITE-752
- * @param e
- * @return
- */
- public boolean checkFailureDetectionThresholdReached(Exception e) {
- if (!failureDetectionThresholdEnabled)
- return false;
-
- return e instanceof NetOperationTimeoutException || e instanceof SocketTimeoutException ||
- X.hasCause(e, NetOperationTimeoutException.class, SocketException.class);
- }
-
- /**
- * TODO: IGNITE-752
* @param enabled
*/
public void failureDetectionThresholdEnabled(boolean enabled) {
@@ -648,20 +601,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
return failureDetectionThreshold;
}
-
- /**
- * TODO: IGNITE-752
- */
- public static class NetOperationTimeoutException extends IgniteCheckedException {
- /**
- * Constructor.
- * @param msg Error message.
- */
- public NetOperationTimeoutException(String msg) {
- super(msg);
- }
- }
-
/**
* Temporarily SPI context.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
new file mode 100644
index 0000000..3ae4fa4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutController.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * TODO: IGNITE-752
+ */
+public class IgniteSpiOperationTimeoutController {
+ /** */
+ private long lastOperStartTs;
+
+ /** */
+ private long timeout;
+
+ /** */
+ private final boolean failureDetectionThresholdEnabled;
+
+ /** */
+ private final long failureDetectionThreshold;
+
+ /**
+ * Constructor.
+ *
+ * @param adapter SPI adapter.
+ */
+ public IgniteSpiOperationTimeoutController(IgniteSpiAdapter adapter) {
+ failureDetectionThresholdEnabled = adapter.failureDetectionThresholdEnabled();
+ failureDetectionThreshold = adapter.failureDetectionThreshold();
+ }
+
+ /**
+ * TODO: IGNITE-752
+ * @param dfltTimeout
+ * @return
+ * @throws IgniteSpiOperationTimeoutException
+ */
+ public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException {
+ if (!failureDetectionThresholdEnabled)
+ return dfltTimeout;
+
+ if (lastOperStartTs == 0) {
+ timeout = failureDetectionThreshold;
+ lastOperStartTs = U.currentTimeMillis();
+ }
+ else {
+ long curTs = U.currentTimeMillis();
+
+ timeout = timeout - (curTs - lastOperStartTs);
+
+ lastOperStartTs = curTs;
+
+ if (timeout <= 0)
+ throw new IgniteSpiOperationTimeoutException("Network operation timed out. Increase failure detection" +
+ " threshold using IgniteConfiguration.setFailureDetectionThreshold() or set SPI specific timeouts" +
+ " manually. Current failure detection threshold: " + failureDetectionThreshold);
+ }
+
+ return timeout;
+ }
+
+ /**
+ * TODO: IGNITE-752
+ * @param e
+ * @return
+ */
+ public boolean checkFailureDetectionThresholdReached(Exception e) {
+ if (!failureDetectionThresholdEnabled)
+ return false;
+
+ return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException ||
+ X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
new file mode 100644
index 0000000..c90b45b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.*;
+
+/**
+ * TODO: IGNITE-752
+ */
+public class IgniteSpiOperationTimeoutException extends IgniteCheckedException {
+ /**
+ * Constructor.
+ * @param msg Error message.
+ */
+ public IgniteSpiOperationTimeoutException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9dd565c..d506507 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -50,7 +50,6 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.*;
import static org.apache.ignite.spi.IgnitePortProtocol.*;
import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*;
import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage.*;
-import static org.apache.ignite.spi.IgniteSpiAdapter.NetOperationTimeoutException;
/**
*
@@ -279,9 +278,10 @@ class ServerImpl extends TcpDiscoveryImpl {
msgWorker.addMessage(new TcpDiscoveryNodeLeftMessage(locNode.id()));
synchronized (mux) {
- long threshold = U.currentTimeMillis() + spi.netTimeout;
+ long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+ spi.getNetworkTimeout();
- long timeout = spi.netTimeout;
+ long threshold = U.currentTimeMillis() + timeout;
while (spiState != LEFT && timeout > 0) {
try {
@@ -509,8 +509,7 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
Socket sock = null;
- long timeout = 0;
- long lastOperStartTs = 0;
+ IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
int reconCnt = 0;
@@ -519,22 +518,15 @@ class ServerImpl extends TcpDiscoveryImpl {
if (addr.isUnresolved())
addr = new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort());
- timeout = lastOperStartTs == 0 ? spi.firstNetOperationTimeout(spi.getSocketTimeout()) :
- spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getSocketTimeout());
+ long tstamp = U.currentTimeMillis();
- long tstamp = lastOperStartTs = U.currentTimeMillis();
+ sock = spi.openSocket(addr, timeoutCtrl);
- sock = spi.openSocket(addr, timeout);
+ spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
+ timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
- timeout = spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getSocketTimeout());
- lastOperStartTs = U.currentTimeMillis();
-
- spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), timeout);
-
- timeout = spi.nextNetOperationTimeout(timeout, lastOperStartTs, spi.getNetworkTimeout());
- lastOperStartTs = U.currentTimeMillis();
-
- TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeout);
+ TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(
+ spi.getNetworkTimeout()));
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -557,7 +549,7 @@ class ServerImpl extends TcpDiscoveryImpl {
errs.add(e);
- if (spi.checkFailureDetectionThresholdReached(e))
+ if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
break;
else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
break;
@@ -690,9 +682,10 @@ class ServerImpl extends TcpDiscoveryImpl {
log.debug("Join request message has been sent (waiting for coordinator response).");
synchronized (mux) {
- long threshold = U.currentTimeMillis() + spi.netTimeout;
+ long timeout = spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() :
+ spi.getNetworkTimeout();
- long timeout = spi.netTimeout;
+ long threshold = U.currentTimeMillis() + timeout;
while (spiState == CONNECTING && timeout > 0) {
try {
@@ -734,8 +727,9 @@ class ServerImpl extends TcpDiscoveryImpl {
LT.warn(log, null, "Node has not been connected to topology and will repeat join process. " +
"Check remote nodes logs for possible error messages. " +
"Note that large topology may require significant time to start. " +
- "Increase 'TcpDiscoverySpi.networkTimeout' configuration property " +
- "if getting this message on the starting nodes [networkTimeout=" + spi.netTimeout + ']');
+ "Increase 'IgniteConfiguration.failureDetectionThreshold' configuration property " +
+ "if getting this message on the starting nodes [failureDetectionThreshold=" +
+ spi.failureDetectionThreshold() + ']');
}
}
@@ -855,10 +849,10 @@ class ServerImpl extends TcpDiscoveryImpl {
"(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
addrs);
- if (spi.joinTimeout > 0) {
+ if (spi.getJoinTimeout() > 0) {
if (noResStart == 0)
noResStart = U.currentTimeMillis();
- else if (U.currentTimeMillis() - noResStart > spi.joinTimeout)
+ else if (U.currentTimeMillis() - noResStart > spi.getJoinTimeout())
throw new IgniteSpiException(
"Failed to connect to any address from IP finder within join timeout " +
"(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
@@ -895,15 +889,17 @@ class ServerImpl extends TcpDiscoveryImpl {
Collection<Throwable> errs = null;
- long ackTimeout0 = spi.ackTimeout;
+ long ackTimeout0 = spi.getAckTimeout();
int connectAttempts = 1;
- boolean joinReqSent = false;
+ boolean joinReqSent;
UUID locNodeId = getLocalNodeId();
- for (int i = 0; i < spi.reconCnt; i++) {
+ IgniteSpiOperationTimeoutController timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
+ while (true){
// Need to set to false on each new iteration,
// since remote node may leave in the middle of the first iteration.
joinReqSent = false;
@@ -915,14 +911,16 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr);
+ sock = spi.openSocket(addr, timeoutCtrl);
openSock = true;
// Handshake.
- spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+ spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutCtrl.nextTimeoutChunk(
+ spi.getSocketTimeout()));
- TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutCtrl.nextTimeoutChunk(
+ ackTimeout0));
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -936,7 +934,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Send message.
tstamp = U.currentTimeMillis();
- spi.writeToSocket(sock, msg);
+ spi.writeToSocket(sock, msg, timeoutCtrl.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
@@ -953,7 +951,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// E.g. due to class not found issue.
joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
- return spi.readReceipt(sock, ackTimeout0);
+ return spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
}
catch (ClassCastException e) {
// This issue is rarely reproducible on AmazonEC2, but never
@@ -979,6 +977,9 @@ class ServerImpl extends TcpDiscoveryImpl {
errs.add(e);
+ if (timeoutCtrl.checkFailureDetectionThresholdReached(e))
+ break;
+
if (!openSock) {
// Reconnect for the second time, if connection is not established.
if (connectAttempts < 2) {
@@ -990,7 +991,8 @@ class ServerImpl extends TcpDiscoveryImpl {
break; // Don't retry if we can not establish connection.
}
- if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) {
+ if (!spi.failureDetectionThresholdEnabled() && (e instanceof SocketTimeoutException ||
+ X.hasCause(e, SocketTimeoutException.class))) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
@@ -2029,7 +2031,7 @@ class ServerImpl extends TcpDiscoveryImpl {
List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());
addr: for (InetSocketAddress addr : spi.getNodeAddresses(next, sameHost)) {
- long ackTimeout0 = spi.ackTimeout;
+ long ackTimeout0 = spi.getAckTimeout();
if (locNodeAddrs.contains(addr)){
if (log.isDebugEnabled())
@@ -2039,8 +2041,11 @@ class ServerImpl extends TcpDiscoveryImpl {
continue;
}
- for (int i = 0; i < spi.reconCnt; i++) {
+ while (true) {
if (sock == null) {
+ IgniteSpiOperationTimeoutController timeoutCrt =
+ new IgniteSpiOperationTimeoutController(spi);
+
nextNodeExists = false;
boolean success = false;
@@ -2051,14 +2056,16 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
long tstamp = U.currentTimeMillis();
- sock = spi.openSocket(addr);
+ sock = spi.openSocket(addr, timeoutCrt);
openSock = true;
// Handshake.
- writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId));
+ writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
+ timeoutCrt.nextTimeoutChunk(spi.getNetworkTimeout()));
- TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
+ TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
+ timeoutCrt.nextTimeoutChunk(ackTimeout0));
if (locNodeId.equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -2142,8 +2149,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!openSock)
break; // Don't retry if we can not establish connection.
- if (e instanceof SocketTimeoutException ||
- X.hasCause(e, SocketTimeoutException.class)) {
+ if (timeoutCrt.checkFailureDetectionThresholdReached(e))
+ break;
+ else if (!spi.failureDetectionThresholdEnabled() && (e instanceof
+ SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class))) {
ackTimeout0 *= 2;
if (!checkAckTimeout(ackTimeout0))
@@ -2173,6 +2182,8 @@ class ServerImpl extends TcpDiscoveryImpl {
assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage;
+ IgniteSpiOperationTimeoutController timeoutCtrl;
+
if (failure || forceSndPending) {
if (log.isDebugEnabled())
log.debug("Pending messages will be sent [failure=" + failure +
@@ -2185,6 +2196,8 @@ class ServerImpl extends TcpDiscoveryImpl {
boolean skip = pendingMsgs.discardId != null;
for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
+ timeoutCtrl = new IgniteSpiOperationTimeoutController(spi);
+
if (skip) {
if (pendingMsg.id().equals(pendingMsgs.discardId))
skip = false;
@@ -2198,7 +2211,8 @@ class ServerImpl extends TcpDiscoveryImpl {
pendingMsgs.discardId);
try {
- writeToSocket(sock, pendingMsg);
+ writeToSocket(sock, pendingMsg, timeoutCtrl.nextTimeoutChunk(
+ spi.getSocketTimeout()));
}
finally {
clearNodeAddedMessage(pendingMsg);
@@ -2206,7 +2220,7 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp);
- int res = spi.readReceipt(sock, ackTimeout0);
+ int res = spi.readReceipt(sock, timeoutCtrl.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
log.debug("Pending message has been sent to next node [msg=" + msg.id() +
@@ -2222,6 +2236,7 @@ class ServerImpl extends TcpDiscoveryImpl {
prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+ timeoutCtrl
try {
long tstamp = U.currentTimeMillis();
@@ -4903,14 +4918,15 @@ class ServerImpl extends TcpDiscoveryImpl {
/**
* @param sock Socket.
* @param msg Message.
+ * @param timeout Socket timeout.
* @throws IOException If IO failed.
* @throws IgniteCheckedException If marshalling failed.
*/
- protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+ protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException, IgniteCheckedException {
bout.reset();
- spi.writeToSocket(sock, msg, bout);
+ spi.writeToSocket(sock, msg, bout, timeout);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index ace917f..c4278ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -266,10 +266,10 @@ abstract class TcpDiscoveryImpl {
* maximum acknowledgement timeout, {@code false} otherwise.
*/
protected boolean checkAckTimeout(long ackTimeout) {
- if (ackTimeout > spi.maxAckTimeout) {
+ if (ackTimeout > spi.getMaxAckTimeout()) {
LT.warn(log, null, "Acknowledgement timeout is greater than maximum acknowledgement timeout " +
"(consider increasing 'maxAckTimeout' configuration property) " +
- "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.maxAckTimeout + ']');
+ "[ackTimeout=" + ackTimeout + ", maxAckTimeout=" + spi.getMaxAckTimeout() + ']');
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b6005a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index e5d5cd6..126bf03 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -676,7 +676,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/**
* Sets IP finder for IP addresses sharing and storing.
* <p>
- * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
+ * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will
+ * be used by default.
*
* @param ipFinder IP finder.
*/
@@ -1103,12 +1104,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/**
* @param sockAddr Remote address.
- * @param timeout Socket opening timeout.
+ * @param timeoutCtrl Timeout controller.
* @return Opened socket.
* @throws IOException If failed.
*/
- protected Socket openSocket(InetSocketAddress sockAddr, long timeout) throws IOException,
- NetOperationTimeoutException {
+ protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutController timeoutCtrl)
+ throws IOException, IgniteSpiOperationTimeoutException {
assert sockAddr != null;
InetSocketAddress resolved = sockAddr.isUnresolved() ?
@@ -1126,11 +1127,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
long startTs = U.currentTimeMillis();
- sock.connect(resolved, (int)timeout);
+ sock.connect(resolved, (int)timeoutCtrl.nextTimeoutChunk(sockTimeout));
- timeout = nextNetOperationTimeout(timeout, startTs, sockTimeout);
-
- writeToSocket(sock, U.IGNITE_HEADER, timeout);
+ writeToSocket(sock, U.IGNITE_HEADER, timeoutCtrl.nextTimeoutChunk(sockTimeout));
return sock;
}
@@ -1148,7 +1147,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
assert sock != null;
assert data != null;
- //SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);