You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/19 12:35:17 UTC
[31/50] incubator-ignite git commit: # ignite-1003 add
lastSuccessfulAddr for ping
# ignite-1003 add lastSuccessfulAddr for ping
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b23f9300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b23f9300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b23f9300
Branch: refs/heads/ignite-950
Commit: b23f9300794c7e84dfc83c1b8e49de673fa354e1
Parents: d874b00
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 18 13:52:45 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 18 13:52:45 2015 +0300
----------------------------------------------------------------------
.../communication/tcp/TcpCommunicationSpi.java | 38 ++++++++++----------
.../ignite/spi/discovery/tcp/ServerImpl.java | 21 +++++++----
.../spi/discovery/tcp/TcpDiscoverySpi.java | 9 ++++-
.../tcp/internal/TcpDiscoveryNode.java | 18 ++++++++++
4 files changed, 60 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b23f9300/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 9e38788..39f4eeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -267,23 +267,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (!isNodeStopping()) {
GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor();
- if (!getSpiContext().tryFailNode(id)) {
- if (recoveryData != null) {
- if (recoveryData.nodeAlive(getSpiContext().node(id))) {
- if (!recoveryData.messagesFutures().isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Session was closed but there are unacknowledged messages, " +
- "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
-
- commWorker.addReconnectRequest(recoveryData);
- }
+ if (recoveryData != null) {
+ if (recoveryData.nodeAlive(getSpiContext().node(id))) {
+ if (!recoveryData.messagesFutures().isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Session was closed but there are unacknowledged messages, " +
+ "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
+
+ commWorker.addReconnectRequest(recoveryData);
}
- else
- recoveryData.onNodeLeft();
}
+ else
+ recoveryData.onNodeLeft();
}
- else
- recoveryData.onNodeLeft();
}
}
@@ -1884,18 +1880,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
"TCP communication addresses or mapped external addresses. Check configuration and make sure " +
"that you use the same communication SPI on all nodes. Remote node id: " + node.id());
- List<InetSocketAddress> addrs;
+ LinkedHashSet<InetSocketAddress> addrs;
// Try to connect first on bound addresses.
if (isRmtAddrsExist) {
- addrs = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
+ List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
- Collections.sort(addrs, U.inetAddressesComparator(sameHost));
+ Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
+
+ addrs = new LinkedHashSet<>(addrs0);
}
else
- addrs = new ArrayList<>();
+ addrs = new LinkedHashSet<>();
// Then on mapped external addresses.
if (isExtAddrsExist)
@@ -2610,7 +2608,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
ClusterNode node = recoveryDesc.node();
- if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+ if (clients.containsKey(node.id()) ||
+ !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
+ !getSpiContext().pingNode(node.id()))
return;
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b23f9300/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 63f165d..8a9553e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -440,7 +440,12 @@ class ServerImpl extends TcpDiscoveryImpl {
// ID returned by the node should be the same as ID of the parameter for ping to succeed.
IgniteBiTuple<UUID, Boolean> t = pingNode(addr, clientNodeId);
- return node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
+ boolean res = node.id().equals(t.get1()) && (clientNodeId == null || t.get2());
+
+ if (res)
+ node.lastSuccessfulAddress(addr);
+
+ return res;
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -458,8 +463,9 @@ class ServerImpl extends TcpDiscoveryImpl {
* Pings the node by its address to see if it's alive.
*
* @param addr Address of the node.
+ * @param clientNodeId Client node ID.
* @return ID of the remote node and "client exists" flag if node alive.
- * @throws IgniteSpiException If an error occurs.
+ * @throws IgniteCheckedException If an error occurs.
*/
private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress addr, @Nullable UUID clientNodeId)
throws IgniteCheckedException {
@@ -1589,8 +1595,7 @@ class ServerImpl extends TcpDiscoveryImpl {
regAddrs,
F.notContains(currAddrs),
new P1<InetSocketAddress>() {
- private final Map<InetSocketAddress, Boolean> pingResMap =
- new HashMap<>();
+ private final Map<InetSocketAddress, Boolean> pingResMap = new HashMap<>();
@Override public boolean apply(InetSocketAddress addr) {
Boolean res = pingResMap.get(addr);
@@ -2092,6 +2097,8 @@ class ServerImpl extends TcpDiscoveryImpl {
errs = null;
success = true;
+
+ next.lastSuccessfulAddress(addr);
}
}
catch (IOException | IgniteCheckedException e) {
@@ -2672,6 +2679,8 @@ class ServerImpl extends TcpDiscoveryImpl {
try {
sendMessageDirectly(msg, addr);
+ node.lastSuccessfulAddress(addr);
+
ex = null;
break;
@@ -4588,7 +4597,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- *
+ * @param res Ping result.
*/
public void pingResult(boolean res) {
GridFutureAdapter<Boolean> fut = pingFut.getAndSet(null);
@@ -4598,7 +4607,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- *
+ * @throws InterruptedException If interrupted.
*/
public boolean ping() throws InterruptedException {
if (spi.isNodeStopping0())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b23f9300/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index e4ef744..baada21 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -892,7 +892,14 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
Collections.sort(addrs, U.inetAddressesComparator(sameHost));
- LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>(addrs);
+ LinkedHashSet<InetSocketAddress> res = new LinkedHashSet<>();
+
+ InetSocketAddress lastAddr = node.lastSuccessfulAddress();
+
+ if (lastAddr != null)
+ res.add(lastAddr);
+
+ res.addAll(addrs);
Collection<InetSocketAddress> extAddrs = node.attribute(createSpiAttributeName(ATTR_EXT_ADDRS));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b23f9300/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index cc61c9d..36ae39e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -111,6 +111,10 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
@GridToStringExclude
private UUID clientRouterNodeId;
+ /** */
+ @GridToStringExclude
+ private volatile transient InetSocketAddress lastSuccessfulAddr;
+
/**
* Public default no-arg constructor for {@link Externalizable} interface.
*/
@@ -152,6 +156,20 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
sockAddrs = U.toSocketAddresses(this, discPort);
}
+ /**
+ * @return Last successfully connected address.
+ */
+ @Nullable public InetSocketAddress lastSuccessfulAddress() {
+ return lastSuccessfulAddr;
+ }
+
+ /**
+ * @param lastSuccessfulAddr Last successfully connected address.
+ */
+ public void lastSuccessfulAddress(InetSocketAddress lastSuccessfulAddr) {
+ this.lastSuccessfulAddr = lastSuccessfulAddr;
+ }
+
/** {@inheritDoc} */
@Override public UUID id() {
return id;