You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/30 14:49:09 UTC

[2/8] ignite git commit: ignite-1564 Fixed client cache reconnect issues

ignite-1564 Fixed client cache reconnect issues


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/273f291d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/273f291d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/273f291d

Branch: refs/heads/ignite-1534
Commit: 273f291d9fac0919d57b9ed732564b323a956f90
Parents: cd43967
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 30 13:48:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 30 13:48:48 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  22 +--
 .../processors/cache/GridCacheEventManager.java |  12 +-
 .../processors/cache/GridCacheProcessor.java    |  68 +++++---
 .../dht/preloader/GridDhtPreloader.java         |   6 +
 .../IgniteClientReconnectAbstractTest.java      |  35 ++++-
 .../IgniteClientReconnectCacheTest.java         | 154 +++++++++++++++++++
 6 files changed, 249 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 785613d..aec36a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -78,7 +78,7 @@ import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
+import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -208,7 +208,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
     /** Topology cache history. */
     private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist =
-        new GridBoundedConcurrentLinkedHashMap<>(DISCOVERY_HISTORY_SIZE, DISCOVERY_HISTORY_SIZE, 0.7f, 1);
+        new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE);
 
     /** Topology snapshots history. */
     private volatile Map<Long, Collection<ClusterNode>> topHist = new HashMap<>();
@@ -465,14 +465,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 final Map<Long, Collection<ClusterNode>> snapshots,
                 @Nullable DiscoverySpiCustomMessage spiCustomMsg
             ) {
-                if (type == EVT_NODE_JOINED && node.isLocal() && ctx.clientDisconnected()) {
-                    discoCacheHist.clear();
-
-                    topHist.clear();
-
-                    topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, null));
-                }
-
                 DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
                     : ((CustomMessageWrapper)spiCustomMsg).delegate();
 
@@ -593,6 +585,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     locJoinEvt = new GridFutureAdapter<>();
 
                     registeredCaches.clear();
+
+                    discoCacheHist.clear();
+
+                    topHist.clear();
+
+                    topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
+                        new DiscoCache(locNode, Collections.<ClusterNode>emptySet())));
                 }
                 else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                     assert locNode.isClient() : locNode;
@@ -620,7 +619,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     return;
                 }
 
-                discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
+                if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected())
+                    discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
             }
         });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index c2f8f3f..751c316 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -40,14 +40,6 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
  * Cache event manager.
  */
 public class GridCacheEventManager extends GridCacheManagerAdapter {
-    /** Local node ID. */
-    private UUID locNodeId;
-
-    /** {@inheritDoc} */
-    @Override public void start0() {
-        locNodeId = cctx.localNodeId();
-    }
-
     /**
      * Adds local event listener.
      *
@@ -96,7 +88,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
     {
         addEvent(part,
             key,
-            locNodeId,
+            cctx.localNodeId(),
             tx,
             owner,
             type,
@@ -116,7 +108,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
         addEvent(
             0,
             null,
-            locNodeId,
+            cctx.localNodeId(),
             (IgniteUuid)null,
             null,
             type,

http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c86dfd9..6c13399 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -26,6 +26,7 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
@@ -197,6 +198,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** */
     private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
 
+    /** */
+    private Map<UUID, DynamicCacheChangeBatch> clientReconnectReqs;
+
     /**
      * @param ctx Kernal context.
      */
@@ -1050,6 +1054,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
         }
 
+        if (clientReconnectReqs != null) {
+            for (Map.Entry<UUID, DynamicCacheChangeBatch> e : clientReconnectReqs.entrySet())
+                processClientReconnectData(e.getKey(), e.getValue());
+
+            clientReconnectReqs = null;
+        }
+
         sharedCtx.onReconnected();
 
         for (GridCacheAdapter cache : reconnected)
@@ -1881,28 +1892,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data;
 
             if (batch.clientReconnect()) {
-                for (DynamicCacheChangeRequest req : batch.requests()) {
-                    assert !req.template() : req;
-
-                    String name = req.cacheName();
-
-                    boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+                if (ctx.clientDisconnected()) {
+                    if (clientReconnectReqs == null)
+                        clientReconnectReqs = new LinkedHashMap<>();
 
-                    if (!sysCache) {
-                        DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
+                    clientReconnectReqs.put(joiningNodeId, batch);
 
-                        if (desc != null && desc.deploymentId().equals(req.deploymentId())) {
-                            Map<UUID, Boolean> nodes = batch.clientNodes().get(name);
-
-                            assert nodes != null : req;
-                            assert nodes.containsKey(joiningNodeId) : nodes;
-
-                            ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, nodes.get(joiningNodeId));
-                        }
-                    }
-                    else
-                        ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, false);
+                    return;
                 }
+
+                processClientReconnectData(joiningNodeId, batch);
             }
             else {
                 for (DynamicCacheChangeRequest req : batch.requests()) {
@@ -1983,6 +1982,37 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param clientNodeId Client node ID.
+     * @param batch Cache change batch.
+     */
+    private void processClientReconnectData(UUID clientNodeId, DynamicCacheChangeBatch batch) {
+        assert batch.clientReconnect() : batch;
+
+        for (DynamicCacheChangeRequest req : batch.requests()) {
+            assert !req.template() : req;
+
+            String name = req.cacheName();
+
+            boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+
+            if (!sysCache) {
+                DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
+
+                if (desc != null && desc.deploymentId().equals(req.deploymentId())) {
+                    Map<UUID, Boolean> nodes = batch.clientNodes().get(name);
+
+                    assert nodes != null : req;
+                    assert nodes.containsKey(clientNodeId) : nodes;
+
+                    ctx.discovery().addClientNode(req.cacheName(), clientNodeId, nodes.get(clientNodeId));
+                }
+            }
+            else
+                ctx.discovery().addClientNode(req.cacheName(), clientNodeId, false);
+        }
+    }
+
+    /**
      * Dynamically starts cache using template configuration.
      *
      * @param cacheName Cache name.

http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9d5fdca..19b461e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -260,6 +260,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** {@inheritDoc} */
     @Override public void onReconnected() {
         startFut = new GridFutureAdapter<>();
+
+        long topVer0 = cctx.kernalContext().discovery().topologyVersion();
+
+        assert topVer0 > 0 : topVer0;
+
+        topVer.set(topVer0);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 3a6d04f..0c1df7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal;
 
 import java.io.IOException;
 import java.net.Socket;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -199,15 +201,28 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
      */
     protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC)
         throws Exception {
-        final TestTcpDiscoverySpi clientSpi = spi(client);
+        reconnectClientNodes(Collections.singletonList(client), srv, disconnectedC);
+    }
+
+    /**
+     * Reconnect client node.
+     *
+     * @param clients Clients.
+     * @param srv Server.
+     * @param disconnectedC Closure which will be run when client node disconnected.
+     * @throws Exception If failed.
+     */
+    protected void reconnectClientNodes(List<Ignite> clients, Ignite srv, @Nullable Runnable disconnectedC)
+        throws Exception {
         final TestTcpDiscoverySpi srvSpi = spi(srv);
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(1);
-        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+        final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
+        final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
 
         log.info("Block reconnect.");
 
-        clientSpi.writeLatch = new CountDownLatch(1);
+        for (Ignite client : clients)
+            spi(client).writeLatch = new CountDownLatch(1);
 
         IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
             @Override public boolean apply(Event evt) {
@@ -226,9 +241,11 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
             }
         };
 
-        client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+        for (Ignite client : clients)
+            client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
 
-        srvSpi.failNode(client.cluster().localNode().id(), null);
+        for (Ignite client : clients)
+            srvSpi.failNode(client.cluster().localNode().id(), null);
 
         waitReconnectEvent(disconnectLatch);
 
@@ -237,11 +254,13 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
 
         log.info("Allow reconnect.");
 
-        clientSpi.writeLatch.countDown();
+        for (Ignite client : clients)
+            spi(client).writeLatch.countDown();
 
         waitReconnectEvent(reconnectLatch);
 
-        client.events().stopLocalListen(p);
+        for (Ignite client : clients)
+            client.events().stopLocalListen(p);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 807027c..edd95e9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -998,6 +999,159 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectClusterRestartMultinode() throws Exception {
+        clientMode = true;
+
+        final int CLIENTS = 5;
+
+        CountDownLatch disconnectLatch = new CountDownLatch(CLIENTS);
+        CountDownLatch reconnectLatch = new CountDownLatch(CLIENTS);
+
+        List<IgniteCache> caches = new ArrayList<>();
+
+        for (int i = 0; i < CLIENTS; i++) {
+            Ignite client = startGrid(SRV_CNT + i);
+
+            addListener(client, disconnectLatch, reconnectLatch);
+
+            IgniteCache cache = client.getOrCreateCache(new CacheConfiguration<>());
+
+            assertNotNull(cache);
+
+            caches.add(cache);
+        }
+
+        for (int i = 0; i < SRV_CNT; i++)
+            stopGrid(i);
+
+        assertTrue(disconnectLatch.await(30_000, MILLISECONDS));
+
+        log.info("Restart servers.");
+
+        clientMode = false;
+
+        startGridsMultiThreaded(0, SRV_CNT);
+
+        assertTrue(reconnectLatch.await(30_000, MILLISECONDS));
+
+        for (final IgniteCache clientCache : caches) {
+            GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    return clientCache.get(1);
+                }
+            }, IllegalStateException.class, null);
+        }
+
+        for (int i = 0; i < SRV_CNT + CLIENTS; i++) {
+            Ignite ignite = grid(i);
+
+            ClusterGroup grp = ignite.cluster().forCacheNodes(null);
+
+            assertEquals(0, grp.nodes().size());
+
+            grp = ignite.cluster().forClientNodes(null);
+
+            assertEquals(0, grp.nodes().size());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectMultinode() throws Exception {
+        grid(0).createCache(new CacheConfiguration<>());
+
+        clientMode = true;
+
+        final int CLIENTS = 2;
+
+        List<Ignite> clients = new ArrayList<>();
+
+        for (int i = 0; i < CLIENTS; i++) {
+            Ignite client = startGrid(SRV_CNT + i);
+
+            assertNotNull(client.getOrCreateCache(new CacheConfiguration<>()));
+
+            clients.add(client);
+        }
+
+        int nodes = SRV_CNT + CLIENTS;
+        int srvNodes = SRV_CNT;
+
+        for (int iter = 0; iter < 3; iter++) {
+            log.info("Iteration: " + iter);
+
+            reconnectClientNodes(clients, grid(0), null);
+
+            for (Ignite client : clients) {
+                IgniteCache<Object, Object> cache = client.cache(null);
+
+                assertNotNull(cache);
+
+                cache.put(client.name(), 1);
+
+                assertEquals(1, cache.get(client.name()));
+
+                ClusterGroup grp = client.cluster().forCacheNodes(null);
+
+                assertEquals(CLIENTS + srvNodes, grp.nodes().size());
+
+                grp = client.cluster().forClientNodes(null);
+
+                assertEquals(CLIENTS, grp.nodes().size());
+            }
+
+            for (int i = 0; i < nodes; i++) {
+                Ignite ignite = grid(i);
+
+                ClusterGroup grp = ignite.cluster().forCacheNodes(null);
+
+                assertEquals(CLIENTS + srvNodes, grp.nodes().size());
+
+                grp = ignite.cluster().forClientNodes(null);
+
+                assertEquals(CLIENTS, grp.nodes().size());
+            }
+
+            clientMode = false;
+
+            startGrid(nodes++);
+
+            srvNodes++;
+
+            clientMode = true;
+
+            startGrid(nodes++);
+        }
+    }
+
+    /**
+     * @param client Client.
+     * @param disconnectLatch Disconnect event latch.
+     * @param reconnectLatch Reconnect event latch.
+     */
+    private void addListener(Ignite client, final CountDownLatch disconnectLatch, final CountDownLatch reconnectLatch) {
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+    }
+
+    /**
      *
      */
     static class TestClass1 implements Serializable {}