You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/14 15:20:59 UTC
[2/2] incubator-ignite git commit: # IGNITE-709 Fail remote nodes on
TcpClientDiscoverySpi.dissconnect()
# IGNITE-709 Fail remote nodes on TcpClientDiscoverySpi.dissconnect()
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bcff2103
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bcff2103
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bcff2103
Branch: refs/heads/ignite-709_2
Commit: bcff2103026318e50f6a4e3ac5d018bd26073e2e
Parents: 71f9172
Author: sevdokimov <se...@gridgain.com>
Authored: Thu May 14 16:20:16 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu May 14 16:20:16 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheUtils.java | 14 +-
.../GridDhtPartitionsExchangeFuture.java | 7 -
.../discovery/tcp/TcpClientDiscoverySpi.java | 139 +++++++++++--------
.../tcp/internal/TcpDiscoveryNodesRing.java | 2 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 4 +
5 files changed, 95 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 549f42f..8e082c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -631,9 +631,14 @@ public class GridCacheUtils {
* @return Oldest node for the given topology version.
*/
public static ClusterNode oldest(GridCacheContext cctx, AffinityTopologyVersion topOrder) {
+ Collection<ClusterNode> aliveCacheNodes = aliveNodes(cctx, topOrder);
+
+ if (aliveCacheNodes.isEmpty())
+ return cctx.localNode();
+
ClusterNode oldest = null;
- for (ClusterNode n : aliveNodes(cctx, topOrder))
+ for (ClusterNode n : aliveCacheNodes)
if (oldest == null || n.order() < oldest.order())
oldest = n;
@@ -651,9 +656,14 @@ public class GridCacheUtils {
* @return Oldest node for the given topology version.
*/
public static ClusterNode oldest(GridCacheSharedContext cctx, AffinityTopologyVersion topOrder) {
+ Collection<ClusterNode> aliveCacheNodes = aliveCacheNodes(cctx, topOrder);
+
+ if (aliveCacheNodes.isEmpty())
+ return cctx.localNode();
+
ClusterNode oldest = null;
- for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) {
+ for (ClusterNode n : aliveCacheNodes) {
if (oldest == null || n.order() < oldest.order())
oldest = n;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4b8db00..92dad4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -398,13 +398,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- * @return Oldest node.
- */
- ClusterNode oldestNode() {
- return oldestNode.get();
- }
-
- /**
* @return Exchange ID.
*/
public GridDhtPartitionExchangeId exchangeId() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 8cdd7d1..8b32dd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -73,6 +73,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
/** Remote nodes. */
private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
+ /** Topology history. */
+ private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+
/** Remote nodes. */
private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>();
@@ -345,11 +348,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
/** {@inheritDoc} */
@Override public Collection<ClusterNode> getRemoteNodes() {
- return F.view(U.<TcpDiscoveryNode, ClusterNode>arrayList(rmtNodes.values(), new P1<TcpDiscoveryNode>() {
- @Override public boolean apply(TcpDiscoveryNode node) {
- return node.visible();
- }
- }));
+ return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES);
}
/** {@inheritDoc} */
@@ -419,6 +418,29 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
U.join(msgWorker, log);
U.join(sockWriter, log);
U.join(sockReader, log);
+
+ leaveLatch.countDown();
+ joinLatch.countDown();
+
+ getSpiContext().deregisterPorts();
+
+ Collection<ClusterNode> rmts = getRemoteNodes();
+
+ // This is restart/disconnection and remote nodes are not empty.
+ // We need to fire FAIL event for each.
+ DiscoverySpiListener lsnr = this.lsnr;
+
+ if (lsnr != null) {
+ for (ClusterNode n : rmts) {
+ rmtNodes.remove(n.id());
+
+ Collection<ClusterNode> top = updateTopologyHistory(topVer + 1);
+
+ lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null);
+ }
+ }
+
+ rmtNodes.clear();
}
/** {@inheritDoc} */
@@ -555,6 +577,47 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
}
/**
+ * @param topVer New topology version.
+ * @return Latest topology snapshot.
+ */
+ private NavigableSet<ClusterNode> updateTopologyHistory(long topVer) {
+ this.topVer = topVer;
+
+ NavigableSet<ClusterNode> allNodes = allVisibleNodes();
+
+ if (!topHist.containsKey(topVer)) {
+ assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
+ "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
+
+ topHist.put(topVer, allNodes);
+
+ if (topHist.size() > topHistSize)
+ topHist.pollFirstEntry();
+
+ assert topHist.lastKey() == topVer;
+ assert topHist.size() <= topHistSize;
+ }
+
+ return allNodes;
+ }
+
+ /**
+ * @return All nodes.
+ */
+ private NavigableSet<ClusterNode> allVisibleNodes() {
+ NavigableSet<ClusterNode> allNodes = new TreeSet<>();
+
+ for (TcpDiscoveryNode node : rmtNodes.values()) {
+ if (node.visible())
+ allNodes.add(node);
+ }
+
+ allNodes.add(locNode);
+
+ return allNodes;
+ }
+
+ /**
* @param addr Address.
* @return Remote node ID.
* @throws IOException In case of I/O error.
@@ -932,9 +995,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
* Message worker.
*/
private class MessageWorker extends IgniteSpiThread {
- /** Topology history. */
- private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
-
/** Message queue. */
private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
@@ -1042,7 +1102,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
reconnector.cancel();
reconnector.join();
- notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allNodes());
+ notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
}
}
else {
@@ -1113,7 +1173,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
else if (msg instanceof TcpDiscoveryClientPingResponse)
processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
else if (msg instanceof TcpDiscoveryPingRequest)
- processPingRequest((TcpDiscoveryPingRequest)msg);
+ processPingRequest();
stats.onMessageProcessingFinished(msg);
}
@@ -1223,7 +1283,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
if (locNodeVer.equals(node.version()))
node.version(locNodeVer);
- Collection<ClusterNode> top = updateTopologyHistory(topVer);
+ NavigableSet<ClusterNode> top = updateTopologyHistory(topVer);
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
@@ -1261,7 +1321,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
return;
}
- Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+ NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
@@ -1303,7 +1363,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
return;
}
- Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+ NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
if (!pending && joinLatch.getCount() > 0) {
if (log.isDebugEnabled())
@@ -1399,7 +1459,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
try {
Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
- notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allNodes(), msgObj);
+ notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -1423,10 +1483,8 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
/**
* Router want to ping this client.
- *
- * @param msg Message.
*/
- private void processPingRequest(TcpDiscoveryPingRequest msg) {
+ private void processPingRequest() {
sockWriter.sendMessage(new TcpDiscoveryPingResponse(getLocalNodeId()));
}
@@ -1453,60 +1511,19 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
node.lastUpdateTime(tstamp);
- notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes());
+ notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes());
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}
/**
- * @param topVer New topology version.
- * @return Latest topology snapshot.
- */
- private Collection<ClusterNode> updateTopologyHistory(long topVer) {
- TcpClientDiscoverySpi.this.topVer = topVer;
-
- Collection<ClusterNode> allNodes = allNodes();
-
- if (!topHist.containsKey(topVer)) {
- assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
- "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
-
- topHist.put(topVer, allNodes);
-
- if (topHist.size() > topHistSize)
- topHist.pollFirstEntry();
-
- assert topHist.lastKey() == topVer;
- assert topHist.size() <= topHistSize;
- }
-
- return allNodes;
- }
-
- /**
- * @return All nodes.
- */
- private Collection<ClusterNode> allNodes() {
- Collection<ClusterNode> allNodes = new TreeSet<>();
-
- for (TcpDiscoveryNode node : rmtNodes.values()) {
- if (node.visible())
- allNodes.add(node);
- }
-
- allNodes.add(locNode);
-
- return allNodes;
- }
-
- /**
* @param type Event type.
* @param topVer Topology version.
* @param node Node.
* @param top Topology snapshot.
*/
- private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) {
+ private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top) {
notifyDiscovery(type, topVer, node, top, null);
}
@@ -1516,7 +1533,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
* @param node Node.
* @param top Topology snapshot.
*/
- private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top,
+ private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top,
@Nullable Serializable data) {
DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index e866504..e9eaa1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -32,7 +32,7 @@ import java.util.concurrent.locks.*;
*/
public class TcpDiscoveryNodesRing {
/** Visible nodes filter. */
- private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
+ public static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
@Override public boolean apply(TcpDiscoveryNode node) {
return node.visible();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 1268a23..119fc53 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -518,6 +518,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
};
G.addListener(lsnr);
+ final TcpClientDiscoverySpi client2Disco = (TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi();
+
try {
failServer(2);
@@ -531,6 +533,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
finally {
G.removeListener(lsnr);
}
+
+ assert client2Disco.getRemoteNodes().isEmpty();
}
/**