You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/14 17:22:00 UTC

[36/42] incubator-ignite git commit: # IGNITE-709 Fail remote nodes on TcpClientDiscoverySpi.dissconnect()

# IGNITE-709 Fail remote nodes on TcpClientDiscoverySpi.dissconnect()


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bcff2103
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bcff2103
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bcff2103

Branch: refs/heads/ignite-836_2
Commit: bcff2103026318e50f6a4e3ac5d018bd26073e2e
Parents: 71f9172
Author: sevdokimov <se...@gridgain.com>
Authored: Thu May 14 16:20:16 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu May 14 16:20:16 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheUtils.java        |  14 +-
 .../GridDhtPartitionsExchangeFuture.java        |   7 -
 .../discovery/tcp/TcpClientDiscoverySpi.java    | 139 +++++++++++--------
 .../tcp/internal/TcpDiscoveryNodesRing.java     |   2 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |   4 +
 5 files changed, 95 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 549f42f..8e082c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -631,9 +631,14 @@ public class GridCacheUtils {
      * @return Oldest node for the given topology version.
      */
     public static ClusterNode oldest(GridCacheContext cctx, AffinityTopologyVersion topOrder) {
+        Collection<ClusterNode> aliveCacheNodes = aliveNodes(cctx, topOrder);
+
+        if (aliveCacheNodes.isEmpty())
+            return cctx.localNode();
+
         ClusterNode oldest = null;
 
-        for (ClusterNode n : aliveNodes(cctx, topOrder))
+        for (ClusterNode n : aliveCacheNodes)
             if (oldest == null || n.order() < oldest.order())
                 oldest = n;
 
@@ -651,9 +656,14 @@ public class GridCacheUtils {
      * @return Oldest node for the given topology version.
      */
     public static ClusterNode oldest(GridCacheSharedContext cctx, AffinityTopologyVersion topOrder) {
+        Collection<ClusterNode> aliveCacheNodes = aliveCacheNodes(cctx, topOrder);
+
+        if (aliveCacheNodes.isEmpty())
+            return cctx.localNode();
+
         ClusterNode oldest = null;
 
-        for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) {
+        for (ClusterNode n : aliveCacheNodes) {
             if (oldest == null || n.order() < oldest.order())
                 oldest = n;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4b8db00..92dad4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -398,13 +398,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @return Oldest node.
-     */
-    ClusterNode oldestNode() {
-        return oldestNode.get();
-    }
-
-    /**
      * @return Exchange ID.
      */
     public GridDhtPartitionExchangeId exchangeId() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 8cdd7d1..8b32dd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -73,6 +73,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
 
+    /** Topology history. */
+    private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
+
     /** Remote nodes. */
     private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>();
 
@@ -345,11 +348,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
 
     /** {@inheritDoc} */
     @Override public Collection<ClusterNode> getRemoteNodes() {
-        return F.view(U.<TcpDiscoveryNode, ClusterNode>arrayList(rmtNodes.values(), new P1<TcpDiscoveryNode>() {
-            @Override public boolean apply(TcpDiscoveryNode node) {
-                return node.visible();
-            }
-        }));
+        return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES);
     }
 
     /** {@inheritDoc} */
@@ -419,6 +418,29 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
         U.join(msgWorker, log);
         U.join(sockWriter, log);
         U.join(sockReader, log);
+
+        leaveLatch.countDown();
+        joinLatch.countDown();
+
+        getSpiContext().deregisterPorts();
+
+        Collection<ClusterNode> rmts = getRemoteNodes();
+
+        // This is restart/disconnection and remote nodes are not empty.
+        // We need to fire FAIL event for each.
+        DiscoverySpiListener lsnr = this.lsnr;
+
+        if (lsnr != null) {
+            for (ClusterNode n : rmts) {
+                rmtNodes.remove(n.id());
+
+                Collection<ClusterNode> top = updateTopologyHistory(topVer + 1);
+
+                lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null);
+            }
+        }
+
+        rmtNodes.clear();
     }
 
     /** {@inheritDoc} */
@@ -555,6 +577,47 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
     }
 
     /**
+     * @param topVer New topology version.
+     * @return Latest topology snapshot.
+     */
+    private NavigableSet<ClusterNode> updateTopologyHistory(long topVer) {
+        this.topVer = topVer;
+
+        NavigableSet<ClusterNode> allNodes = allVisibleNodes();
+
+        if (!topHist.containsKey(topVer)) {
+            assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
+                "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
+
+            topHist.put(topVer, allNodes);
+
+            if (topHist.size() > topHistSize)
+                topHist.pollFirstEntry();
+
+            assert topHist.lastKey() == topVer;
+            assert topHist.size() <= topHistSize;
+        }
+
+        return allNodes;
+    }
+
+    /**
+     * @return All nodes.
+     */
+    private NavigableSet<ClusterNode> allVisibleNodes() {
+        NavigableSet<ClusterNode> allNodes = new TreeSet<>();
+
+        for (TcpDiscoveryNode node : rmtNodes.values()) {
+            if (node.visible())
+                allNodes.add(node);
+        }
+
+        allNodes.add(locNode);
+
+        return allNodes;
+    }
+
+    /**
      * @param addr Address.
      * @return Remote node ID.
      * @throws IOException In case of I/O error.
@@ -932,9 +995,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
      * Message worker.
      */
     private class MessageWorker extends IgniteSpiThread {
-        /** Topology history. */
-        private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
-
         /** Message queue. */
         private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
 
@@ -1042,7 +1102,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                             reconnector.cancel();
                             reconnector.join();
 
-                            notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allNodes());
+                            notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
                         }
                     }
                     else {
@@ -1113,7 +1173,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
             else if (msg instanceof TcpDiscoveryClientPingResponse)
                 processClientPingResponse((TcpDiscoveryClientPingResponse)msg);
             else if (msg instanceof TcpDiscoveryPingRequest)
-                processPingRequest((TcpDiscoveryPingRequest)msg);
+                processPingRequest();
 
             stats.onMessageProcessingFinished(msg);
         }
@@ -1223,7 +1283,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                 if (locNodeVer.equals(node.version()))
                     node.version(locNodeVer);
 
-                Collection<ClusterNode> top = updateTopologyHistory(topVer);
+                NavigableSet<ClusterNode> top = updateTopologyHistory(topVer);
 
                 if (!pending && joinLatch.getCount() > 0) {
                     if (log.isDebugEnabled())
@@ -1261,7 +1321,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                     return;
                 }
 
-                Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+                NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
 
                 if (!pending && joinLatch.getCount() > 0) {
                     if (log.isDebugEnabled())
@@ -1303,7 +1363,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                     return;
                 }
 
-                Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
+                NavigableSet<ClusterNode> top = updateTopologyHistory(msg.topologyVersion());
 
                 if (!pending && joinLatch.getCount() > 0) {
                     if (log.isDebugEnabled())
@@ -1399,7 +1459,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                         try {
                             Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
 
-                            notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allNodes(), msgObj);
+                            notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
                         }
                         catch (IgniteCheckedException e) {
                             U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -1423,10 +1483,8 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
 
         /**
          * Router want to ping this client.
-         *
-         * @param msg Message.
          */
-        private void processPingRequest(TcpDiscoveryPingRequest msg) {
+        private void processPingRequest() {
             sockWriter.sendMessage(new TcpDiscoveryPingResponse(getLocalNodeId()));
         }
 
@@ -1453,60 +1511,19 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
 
                 node.lastUpdateTime(tstamp);
 
-                notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes());
+                notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes());
             }
             else if (log.isDebugEnabled())
                 log.debug("Received metrics from unknown node: " + nodeId);
         }
 
         /**
-         * @param topVer New topology version.
-         * @return Latest topology snapshot.
-         */
-        private Collection<ClusterNode> updateTopologyHistory(long topVer) {
-            TcpClientDiscoverySpi.this.topVer = topVer;
-
-            Collection<ClusterNode> allNodes = allNodes();
-
-            if (!topHist.containsKey(topVer)) {
-                assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
-                    "lastVer=" + topHist.lastKey() + ", newVer=" + topVer;
-
-                topHist.put(topVer, allNodes);
-
-                if (topHist.size() > topHistSize)
-                    topHist.pollFirstEntry();
-
-                assert topHist.lastKey() == topVer;
-                assert topHist.size() <= topHistSize;
-            }
-
-            return allNodes;
-        }
-
-        /**
-         * @return All nodes.
-         */
-        private Collection<ClusterNode> allNodes() {
-            Collection<ClusterNode> allNodes = new TreeSet<>();
-
-            for (TcpDiscoveryNode node : rmtNodes.values()) {
-                if (node.visible())
-                    allNodes.add(node);
-            }
-
-            allNodes.add(locNode);
-
-            return allNodes;
-        }
-
-        /**
          * @param type Event type.
          * @param topVer Topology version.
          * @param node Node.
          * @param top Topology snapshot.
          */
-        private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top) {
+        private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top) {
             notifyDiscovery(type, topVer, node, top, null);
         }
 
@@ -1516,7 +1533,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
          * @param node Node.
          * @param top Topology snapshot.
          */
-        private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top,
+        private void notifyDiscovery(int type, long topVer, ClusterNode node, NavigableSet<ClusterNode> top,
             @Nullable Serializable data) {
             DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index e866504..e9eaa1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -32,7 +32,7 @@ import java.util.concurrent.locks.*;
  */
 public class TcpDiscoveryNodesRing {
     /** Visible nodes filter. */
-    private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
+    public static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() {
         @Override public boolean apply(TcpDiscoveryNode node) {
             return node.visible();
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff2103/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 1268a23..119fc53 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -518,6 +518,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         };
         G.addListener(lsnr);
 
+        final TcpClientDiscoverySpi client2Disco = (TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi();
+
         try {
             failServer(2);
 
@@ -531,6 +533,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         finally {
             G.removeListener(lsnr);
         }
+
+        assert client2Disco.getRemoteNodes().isEmpty();
     }
 
     /**