You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/30 14:49:09 UTC
[2/8] ignite git commit: ignite-1564 Fixed client cache reconnect
issues
ignite-1564 Fixed client cache reconnect issues
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/273f291d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/273f291d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/273f291d
Branch: refs/heads/ignite-1534
Commit: 273f291d9fac0919d57b9ed732564b323a956f90
Parents: cd43967
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 30 13:48:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 30 13:48:48 2015 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 22 +--
.../processors/cache/GridCacheEventManager.java | 12 +-
.../processors/cache/GridCacheProcessor.java | 68 +++++---
.../dht/preloader/GridDhtPreloader.java | 6 +
.../IgniteClientReconnectAbstractTest.java | 35 ++++-
.../IgniteClientReconnectCacheTest.java | 154 +++++++++++++++++++
6 files changed, 249 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 785613d..aec36a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -78,7 +78,7 @@ import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
+import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -208,7 +208,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Topology cache history. */
private final Map<AffinityTopologyVersion, DiscoCache> discoCacheHist =
- new GridBoundedConcurrentLinkedHashMap<>(DISCOVERY_HISTORY_SIZE, DISCOVERY_HISTORY_SIZE, 0.7f, 1);
+ new GridBoundedConcurrentOrderedMap<>(DISCOVERY_HISTORY_SIZE);
/** Topology snapshots history. */
private volatile Map<Long, Collection<ClusterNode>> topHist = new HashMap<>();
@@ -465,14 +465,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
final Map<Long, Collection<ClusterNode>> snapshots,
@Nullable DiscoverySpiCustomMessage spiCustomMsg
) {
- if (type == EVT_NODE_JOINED && node.isLocal() && ctx.clientDisconnected()) {
- discoCacheHist.clear();
-
- topHist.clear();
-
- topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, null));
- }
-
DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
: ((CustomMessageWrapper)spiCustomMsg).delegate();
@@ -593,6 +585,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
locJoinEvt = new GridFutureAdapter<>();
registeredCaches.clear();
+
+ discoCacheHist.clear();
+
+ topHist.clear();
+
+ topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
+ new DiscoCache(locNode, Collections.<ClusterNode>emptySet())));
}
else if (type == EVT_CLIENT_NODE_RECONNECTED) {
assert locNode.isClient() : locNode;
@@ -620,7 +619,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
return;
}
- discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
+ if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected())
+ discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index c2f8f3f..751c316 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -40,14 +40,6 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
* Cache event manager.
*/
public class GridCacheEventManager extends GridCacheManagerAdapter {
- /** Local node ID. */
- private UUID locNodeId;
-
- /** {@inheritDoc} */
- @Override public void start0() {
- locNodeId = cctx.localNodeId();
- }
-
/**
* Adds local event listener.
*
@@ -96,7 +88,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
{
addEvent(part,
key,
- locNodeId,
+ cctx.localNodeId(),
tx,
owner,
type,
@@ -116,7 +108,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
addEvent(
0,
null,
- locNodeId,
+ cctx.localNodeId(),
(IgniteUuid)null,
null,
type,
http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c86dfd9..6c13399 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -26,6 +26,7 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
@@ -197,6 +198,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** */
private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+ /** */
+ private Map<UUID, DynamicCacheChangeBatch> clientReconnectReqs;
+
/**
* @param ctx Kernal context.
*/
@@ -1050,6 +1054,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
+ if (clientReconnectReqs != null) {
+ for (Map.Entry<UUID, DynamicCacheChangeBatch> e : clientReconnectReqs.entrySet())
+ processClientReconnectData(e.getKey(), e.getValue());
+
+ clientReconnectReqs = null;
+ }
+
sharedCtx.onReconnected();
for (GridCacheAdapter cache : reconnected)
@@ -1881,28 +1892,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data;
if (batch.clientReconnect()) {
- for (DynamicCacheChangeRequest req : batch.requests()) {
- assert !req.template() : req;
-
- String name = req.cacheName();
-
- boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+ if (ctx.clientDisconnected()) {
+ if (clientReconnectReqs == null)
+ clientReconnectReqs = new LinkedHashMap<>();
- if (!sysCache) {
- DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
+ clientReconnectReqs.put(joiningNodeId, batch);
- if (desc != null && desc.deploymentId().equals(req.deploymentId())) {
- Map<UUID, Boolean> nodes = batch.clientNodes().get(name);
-
- assert nodes != null : req;
- assert nodes.containsKey(joiningNodeId) : nodes;
-
- ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, nodes.get(joiningNodeId));
- }
- }
- else
- ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, false);
+ return;
}
+
+ processClientReconnectData(joiningNodeId, batch);
}
else {
for (DynamicCacheChangeRequest req : batch.requests()) {
@@ -1983,6 +1982,37 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @param clientNodeId Client node ID.
+ * @param batch Cache change batch.
+ */
+ private void processClientReconnectData(UUID clientNodeId, DynamicCacheChangeBatch batch) {
+ assert batch.clientReconnect() : batch;
+
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ assert !req.template() : req;
+
+ String name = req.cacheName();
+
+ boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+
+ if (!sysCache) {
+ DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
+
+ if (desc != null && desc.deploymentId().equals(req.deploymentId())) {
+ Map<UUID, Boolean> nodes = batch.clientNodes().get(name);
+
+ assert nodes != null : req;
+ assert nodes.containsKey(clientNodeId) : nodes;
+
+ ctx.discovery().addClientNode(req.cacheName(), clientNodeId, nodes.get(clientNodeId));
+ }
+ }
+ else
+ ctx.discovery().addClientNode(req.cacheName(), clientNodeId, false);
+ }
+ }
+
+ /**
* Dynamically starts cache using template configuration.
*
* @param cacheName Cache name.
http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9d5fdca..19b461e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -260,6 +260,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void onReconnected() {
startFut = new GridFutureAdapter<>();
+
+ long topVer0 = cctx.kernalContext().discovery().topologyVersion();
+
+ assert topVer0 > 0 : topVer0;
+
+ topVer.set(topVer0);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 3a6d04f..0c1df7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal;
import java.io.IOException;
import java.net.Socket;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -199,15 +201,28 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
*/
protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC)
throws Exception {
- final TestTcpDiscoverySpi clientSpi = spi(client);
+ reconnectClientNodes(Collections.singletonList(client), srv, disconnectedC);
+ }
+
+ /**
+ * Reconnect client node.
+ *
+ * @param clients Clients.
+ * @param srv Server.
+ * @param disconnectedC Closure which will be run when client node disconnected.
+ * @throws Exception If failed.
+ */
+ protected void reconnectClientNodes(List<Ignite> clients, Ignite srv, @Nullable Runnable disconnectedC)
+ throws Exception {
final TestTcpDiscoverySpi srvSpi = spi(srv);
- final CountDownLatch disconnectLatch = new CountDownLatch(1);
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
+ final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
+ final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
log.info("Block reconnect.");
- clientSpi.writeLatch = new CountDownLatch(1);
+ for (Ignite client : clients)
+ spi(client).writeLatch = new CountDownLatch(1);
IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
@@ -226,9 +241,11 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
}
};
- client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+ for (Ignite client : clients)
+ client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
- srvSpi.failNode(client.cluster().localNode().id(), null);
+ for (Ignite client : clients)
+ srvSpi.failNode(client.cluster().localNode().id(), null);
waitReconnectEvent(disconnectLatch);
@@ -237,11 +254,13 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
log.info("Allow reconnect.");
- clientSpi.writeLatch.countDown();
+ for (Ignite client : clients)
+ spi(client).writeLatch.countDown();
waitReconnectEvent(reconnectLatch);
- client.events().stopLocalListen(p);
+ for (Ignite client : clients)
+ client.events().stopLocalListen(p);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/273f291d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 807027c..edd95e9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -998,6 +999,159 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
}
/**
+ * @throws Exception If failed.
+ */
+ public void testReconnectClusterRestartMultinode() throws Exception {
+ clientMode = true;
+
+ final int CLIENTS = 5;
+
+ CountDownLatch disconnectLatch = new CountDownLatch(CLIENTS);
+ CountDownLatch reconnectLatch = new CountDownLatch(CLIENTS);
+
+ List<IgniteCache> caches = new ArrayList<>();
+
+ for (int i = 0; i < CLIENTS; i++) {
+ Ignite client = startGrid(SRV_CNT + i);
+
+ addListener(client, disconnectLatch, reconnectLatch);
+
+ IgniteCache cache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ assertNotNull(cache);
+
+ caches.add(cache);
+ }
+
+ for (int i = 0; i < SRV_CNT; i++)
+ stopGrid(i);
+
+ assertTrue(disconnectLatch.await(30_000, MILLISECONDS));
+
+ log.info("Restart servers.");
+
+ clientMode = false;
+
+ startGridsMultiThreaded(0, SRV_CNT);
+
+ assertTrue(reconnectLatch.await(30_000, MILLISECONDS));
+
+ for (final IgniteCache clientCache : caches) {
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientCache.get(1);
+ }
+ }, IllegalStateException.class, null);
+ }
+
+ for (int i = 0; i < SRV_CNT + CLIENTS; i++) {
+ Ignite ignite = grid(i);
+
+ ClusterGroup grp = ignite.cluster().forCacheNodes(null);
+
+ assertEquals(0, grp.nodes().size());
+
+ grp = ignite.cluster().forClientNodes(null);
+
+ assertEquals(0, grp.nodes().size());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectMultinode() throws Exception {
+ grid(0).createCache(new CacheConfiguration<>());
+
+ clientMode = true;
+
+ final int CLIENTS = 2;
+
+ List<Ignite> clients = new ArrayList<>();
+
+ for (int i = 0; i < CLIENTS; i++) {
+ Ignite client = startGrid(SRV_CNT + i);
+
+ assertNotNull(client.getOrCreateCache(new CacheConfiguration<>()));
+
+ clients.add(client);
+ }
+
+ int nodes = SRV_CNT + CLIENTS;
+ int srvNodes = SRV_CNT;
+
+ for (int iter = 0; iter < 3; iter++) {
+ log.info("Iteration: " + iter);
+
+ reconnectClientNodes(clients, grid(0), null);
+
+ for (Ignite client : clients) {
+ IgniteCache<Object, Object> cache = client.cache(null);
+
+ assertNotNull(cache);
+
+ cache.put(client.name(), 1);
+
+ assertEquals(1, cache.get(client.name()));
+
+ ClusterGroup grp = client.cluster().forCacheNodes(null);
+
+ assertEquals(CLIENTS + srvNodes, grp.nodes().size());
+
+ grp = client.cluster().forClientNodes(null);
+
+ assertEquals(CLIENTS, grp.nodes().size());
+ }
+
+ for (int i = 0; i < nodes; i++) {
+ Ignite ignite = grid(i);
+
+ ClusterGroup grp = ignite.cluster().forCacheNodes(null);
+
+ assertEquals(CLIENTS + srvNodes, grp.nodes().size());
+
+ grp = ignite.cluster().forClientNodes(null);
+
+ assertEquals(CLIENTS, grp.nodes().size());
+ }
+
+ clientMode = false;
+
+ startGrid(nodes++);
+
+ srvNodes++;
+
+ clientMode = true;
+
+ startGrid(nodes++);
+ }
+ }
+
+ /**
+ * @param client Client.
+ * @param disconnectLatch Disconnect event latch.
+ * @param reconnectLatch Reconnect event latch.
+ */
+ private void addListener(Ignite client, final CountDownLatch disconnectLatch, final CountDownLatch reconnectLatch) {
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+ }
+
+ /**
*
*/
static class TestClass1 implements Serializable {}