You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/03 16:42:48 UTC
[4/5] incubator-ignite git commit: # ignite-901 WIP
# ignite-901 WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/59b967aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/59b967aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/59b967aa
Branch: refs/heads/ignite-901
Commit: 59b967aa973fc365cb7514e112194c7689982adc
Parents: 363e161
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 3 17:42:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 3 17:42:18 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/GridComponent.java | 3 +-
.../ignite/internal/GridPluginComponent.java | 2 +-
.../apache/ignite/internal/IgniteKernal.java | 6 +-
.../internal/managers/GridManagerAdapter.java | 2 +-
.../deployment/GridDeploymentManager.java | 2 +-
.../discovery/GridDiscoveryManager.java | 5 +-
.../processors/GridProcessorAdapter.java | 2 +-
.../cache/DynamicCacheChangeBatch.java | 17 ++
.../processors/cache/GridCacheProcessor.java | 186 +++++++++++++------
.../datastructures/DataStructuresProcessor.java | 7 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 11 +-
.../IgniteClientReconnectAbstractTest.java | 8 +-
.../IgniteClientReconnectCacheTest.java | 144 +++++++++++++-
13 files changed, 317 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 705576e..fb0a157 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -126,7 +126,8 @@ public interface GridComponent {
public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException;
/**
+ * @param clusterRestarted Cluster restarted flag.
* @throws IgniteCheckedException If failed.
*/
- public void onReconnected() throws IgniteCheckedException;
+ public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index 9639df0..55a84c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -70,7 +70,7 @@ public class GridPluginComponent implements GridComponent {
}
/** {@inheritDoc} */
- @Override public void onReconnected() {
+ @Override public void onReconnected(boolean clusterRestarted) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index f97a1c4..5876288 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2848,14 +2848,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/**
- *
+ * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected.
*/
- public void reconnected() {
+ public void reconnected(boolean clusterRestarted) {
Throwable err = null;
try {
for (GridComponent comp : ctx.components())
- comp.onReconnected();
+ comp.onReconnected(clusterRestarted);
ctx.gateway().onReconnected();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index b0a46eb..1cbe68d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -172,7 +172,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
}
/** {@inheritDoc} */
- @Override public void onReconnected() throws IgniteCheckedException {
+ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
index 9eda2eb..9e418a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
@@ -119,7 +119,7 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
}
/** {@inheritDoc} */
- @Override public void onReconnected() throws IgniteCheckedException {
+ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
storesOnKernalStart();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a8af43b..f95788a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -296,6 +296,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
locJoinEvt = new GridFutureAdapter<>();
+
+ registeredCaches.clear();
}
/** {@inheritDoc} */
@@ -1906,7 +1908,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
case EVT_CLIENT_NODE_RECONNECTED: {
assert localNode().isClient() : evt;
- ((IgniteKernal)ctx.grid()).reconnected();
+ // TODO IGNITE-901.
+ ((IgniteKernal)ctx.grid()).reconnected(false);
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index 1a6791b..8baf95c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -68,7 +68,7 @@ public abstract class GridProcessorAdapter implements GridProcessor {
}
/** {@inheritDoc} */
- @Override public void onReconnected() throws IgniteCheckedException {
+ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index dfc39c1..1e8184d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -43,6 +43,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
/** Custom message ID. */
private IgniteUuid id = IgniteUuid.randomUuid();
+ /** */
+ private boolean clientReconnect;
+
/**
* @param reqs Requests.
*/
@@ -93,6 +96,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
return false;
}
+ /**
+ * @param clientReconnect {@code True} if this is discovery data sent on client reconnect.
+ */
+ public void clientReconnect(boolean clientReconnect) {
+ this.clientReconnect = clientReconnect;
+ }
+
+ /**
+ * @return {@code True} if this is discovery data sent on client reconnect.
+ */
+ public boolean clientReconnect() {
+ return clientReconnect;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeBatch.class, this);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 8d3f8da..4fc02d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -127,6 +127,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Count down latch for caches. */
private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
+ /** */
+ private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+
/**
* @param ctx Kernal context.
*/
@@ -914,6 +917,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ cachesOnDisconnect = new HashMap<>(registeredCaches);
+
+ registeredCaches.clear();
+
+ registeredTemplates.clear();
+
for (GridCacheAdapter cache : caches.values())
cache.context().gate().onDisconnected(reconnectFut);
@@ -922,15 +931,28 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (GridCacheAdapter cache : caches.values())
cache.disconnected();
- registeredCaches.clear();
-
sharedCtx.onDisconnected();
}
/** {@inheritDoc} */
- @Override public void onReconnected() throws IgniteCheckedException {
- for (GridCacheAdapter cache : caches.values())
- cache.context().gate().reconnected(false);
+ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ cachesOnDisconnect = null;
+
+ for (GridCacheAdapter cache : caches.values()) {
+ boolean stopped = !registeredCaches.containsKey(maskNull(cache.name()));
+
+ cache.context().gate().reconnected(stopped);
+
+ if (stopped) {
+ sharedCtx.removeCacheContext(cache.ctx);
+
+ caches.remove(maskNull(cache.name()));
+ jCacheProxies.remove(maskNull(cache.name()));
+
+ onKernalStop(cache, true);
+ stopCache(cache, true);
+ }
+ }
marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() {
@Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
@@ -1690,11 +1712,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
- // Collect dynamically started caches to a single object.
Collection<DynamicCacheChangeRequest> reqs =
+ // Collect dynamically started caches to a single object.
new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
- for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
+
+ Map<String, DynamicCacheDescriptor> descs = reconnect ? cachesOnDisconnect : registeredCaches;
+
+ for (DynamicCacheDescriptor desc : descs.values()) {
if (!desc.cancelled()) {
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
@@ -1722,7 +1748,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheChangeBatch req = new DynamicCacheChangeBatch(reqs);
- req.clientNodes(ctx.discovery().clientNodesMap());
+ Map<String, Map<UUID, Boolean>> clientNodesMap = ctx.discovery().clientNodesMap();
+
+ if (reconnect) {
+ clientNodesMap = U.newHashMap(caches.size());
+
+ for (GridCacheAdapter<?, ?> cache : caches.values()) {
+ Boolean nearEnabled = cache.isNear();
+
+ Map<UUID, Boolean> map = U.newHashMap(1);
+
+ map.put(nodeId, nearEnabled);
+
+ clientNodesMap.put(cache.name(), map);
+ }
+ }
+
+ req.clientNodes(clientNodesMap);
+
+ req.clientReconnect(reconnect);
return req;
}
@@ -1732,38 +1776,86 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (data instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data;
- for (DynamicCacheChangeRequest req : batch.requests()) {
- if (req.template()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
+ if (batch.clientReconnect()) {
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ assert !req.template() : req;
- assert ccfg != null : req;
+ String name = req.cacheName();
- DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
+ boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
- if (existing == null) {
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
- ctx,
- ccfg,
- req.cacheType(),
- true,
- req.deploymentId());
+ if (!sysCache) {
+ DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
- registeredTemplates.put(maskNull(req.cacheName()), desc);
- }
+ if (desc != null && !desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) {
+ Map<UUID, Boolean> nodes = batch.clientNodes().get(name);
- continue;
+ assert nodes != null : req;
+ assert nodes.containsKey(joiningNodeId) : nodes;
+
+ ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, nodes.get(joiningNodeId));
+ }
+ }
+ else
+ ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, false);
}
+ }
+ else {
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ if (req.template()) {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
+
+ assert ccfg != null : req;
+
+ DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
+
+ if (existing == null) {
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
+ ccfg,
+ req.cacheType(),
+ true,
+ req.deploymentId());
+
+ registeredTemplates.put(maskNull(req.cacheName()), desc);
+ }
- DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName()));
+ continue;
+ }
+
+ DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName()));
+
+ if (req.start() && !req.clientStartOnly()) {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
+
+ if (existing != null) {
+ if (existing.locallyConfigured()) {
+ existing.deploymentId(req.deploymentId());
+
+ existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+
+ ctx.discovery().setCacheFilter(
+ req.cacheName(),
+ ccfg.getNodeFilter(),
+ ccfg.getNearConfiguration() != null,
+ ccfg.getCacheMode() == LOCAL);
+ }
+ }
+ else {
+ assert req.cacheType() != null : req;
- if (req.start() && !req.clientStartOnly()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
+ ccfg,
+ req.cacheType(),
+ false,
+ req.deploymentId());
- if (existing != null) {
- if (existing.locallyConfigured()) {
- existing.deploymentId(req.deploymentId());
+ // Received statically configured cache.
+ if (req.initiatingNodeId() == null)
+ desc.staticallyConfigured(true);
- existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+ registeredCaches.put(maskNull(req.cacheName()), desc);
ctx.discovery().setCacheFilter(
req.cacheName(),
@@ -1772,37 +1864,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ccfg.getCacheMode() == LOCAL);
}
}
- else {
- assert req.cacheType() != null : req;
-
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
- ctx,
- ccfg,
- req.cacheType(),
- false,
- req.deploymentId());
-
- // Received statically configured cache.
- if (req.initiatingNodeId() == null)
- desc.staticallyConfigured(true);
-
- registeredCaches.put(maskNull(req.cacheName()), desc);
-
- ctx.discovery().setCacheFilter(
- req.cacheName(),
- ccfg.getNodeFilter(),
- ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode() == LOCAL);
- }
}
- }
- if (!F.isEmpty(batch.clientNodes())) {
- for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
- String cacheName = entry.getKey();
+ if (!F.isEmpty(batch.clientNodes())) {
+ for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
+ String cacheName = entry.getKey();
- for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
- ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+ for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+ ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 95c9563..4637bd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -186,14 +186,17 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onReconnected() throws IgniteCheckedException {
+ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
Set<GridCacheInternal> keys = dsMap.keySet();
Map<GridCacheInternal, GridCacheInternal> vals = dsView.getAll(keys);
for (Map.Entry<GridCacheInternal, GridCacheRemovable> e : dsMap.entrySet()) {
- if (!vals.containsKey(e.getKey()))
+ if (!vals.containsKey(e.getKey())) {
+ dsMap.remove(e.getKey());
+
e.getValue().onRemoved();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 578aae8..f3f19bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1084,6 +1084,9 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private Reconnector reconnector;
+ /** */
+ private boolean nodeAdded;
+
/**
*
*/
@@ -1286,12 +1289,16 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.stats.onMessageProcessingFinished(msg);
}
- private boolean nodeAdded;
-
+ /**
+ * @return {@code True} if client in process of join.
+ */
private boolean joining() {
return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED;
}
+ /**
+ * @return {@code True} if disconnected.
+ */
private boolean disconnected() {
return state == ClientImpl.State.DISCONNECTED;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index a9ce136..0f8aadd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -165,10 +165,10 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
*
* @param client Client.
* @param srv Server.
- * @param disconnectedClosure Closure which will be run when client node disconnected.
+ * @param disconnectedC Closure which will be run when client node disconnected.
* @throws Exception If failed.
*/
- protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedClosure)
+ protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC)
throws Exception {
final TestTcpDiscoverySpi clientSpi = spi(client);
final TestTcpDiscoverySpi srvSpi = spi(srv);
@@ -201,8 +201,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
assertTrue(disconnectLatch.await(5000, MILLISECONDS));
- if (disconnectedClosure != null)
- disconnectedClosure.run();
+ if (disconnectedC != null)
+ disconnectedC.run();
log.info("Allow reconnect.");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 6a77a18..258eef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -100,7 +101,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
public void testReconnect() throws Exception {
clientMode = true;
- Ignite client = startGrid(SRV_CNT);
+ IgniteEx client = startGrid(SRV_CNT);
final TestTcpDiscoverySpi clientSpi = spi(client);
@@ -110,6 +111,16 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>());
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName("nearCache");
+
+ final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>());
+
+ nearCache.put(1, 1);
+
+ assertEquals(1, nearCache.localPeek(1));
+
cache.put(1, 1);
final CountDownLatch disconnectLatch = new CountDownLatch(1);
@@ -141,7 +152,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
catch (CacheException e) {
log.info("Expected exception: " + e);
- IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();
+ IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException) e.getCause();
e0.reconnectFuture().get();
}
@@ -187,6 +198,10 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
+ checkCacheDiscoveryData(srv, client, null, true, true, false);
+
+ checkCacheDiscoveryData(srv, client, "nearCache", true, true, true);
+
assertEquals(1, cache.get(1));
putFut.get();
@@ -197,6 +212,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
assertEquals(3, cache.get(3));
+ assertNull(nearCache.localPeek(1));
+
this.clientMode = false;
IgniteEx srv2 = startGrid(SRV_CNT + 1);
@@ -206,6 +223,10 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
cache.put(key, 4);
assertEquals(4, cache.get(key));
+
+ checkCacheDiscoveryData(srv2, client, null, true, true, false);
+
+ checkCacheDiscoveryData(srv2, client, "nearCache", true, true, true);
}
/**
@@ -231,7 +252,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
final CountDownLatch reconnectLatch = new CountDownLatch(1);
client.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
+ @Override
+ public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
info("Reconnected: " + evt);
@@ -439,6 +461,74 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
}
/**
+ * @throws Exception If failed.
+ */
+ public void testReconnectCacheDestroyed() throws Exception {
+ clientMode = true;
+
+ final IgniteEx client = startGrid(SRV_CNT);
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(client);
+
+ final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ srv.destroyCache(null);
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientCache.get(1);
+ }
+ }, IllegalStateException.class, null);
+
+ checkCacheDiscoveryData(srv, client, null, false, false, false);
+
+ IgniteCache<Object, Object> clientCache0 = client.getOrCreateCache(new CacheConfiguration<>());
+
+ checkCacheDiscoveryData(srv, client, null, true, true, false);
+
+ clientCache0.put(1, 1);
+
+ assertEquals(1, clientCache0.get(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectCacheDestroyedAndCreated() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(SRV_CNT);
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(client);
+
+ final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override
+ public void run() {
+ srv.destroyCache(null);
+
+ srv.getOrCreateCache(new CacheConfiguration<>());
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ return clientCache.get(1);
+ }
+ }, IllegalStateException.class, null);
+ }
+
+ /**
* @param client Client.
* @param ccfg Cache configuration.
* @param msgToBlock Message to block.
@@ -501,6 +591,54 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
}
/**
+ * @param srv Server node.
+ * @param client Client node.
+ * @param cacheName Cache name.
+ * @param cacheExists Cache exists flag.
+ * @param clientCache {@code True} if client node has client cache.
+ * @param clientNear {@code True} if client node has near-enabled client cache.
+ */
+ private void checkCacheDiscoveryData(Ignite srv,
+ Ignite client,
+ String cacheName,
+ boolean cacheExists,
+ boolean clientCache,
+ boolean clientNear)
+ {
+ GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery();
+ GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery();
+
+ ClusterNode srvNode = ((IgniteKernal)srv).localNode();
+ ClusterNode clientNode = ((IgniteKernal)client).localNode();
+
+ assertFalse(srvDisco.cacheAffinityNode(clientNode, cacheName));
+ assertFalse(clientDisco.cacheAffinityNode(clientNode, cacheName));
+
+ assertEquals(cacheExists, srvDisco.cacheAffinityNode(srvNode, cacheName));
+
+ if (clientNear)
+ assertTrue(srvDisco.cacheNearNode(clientNode, cacheName));
+ else
+ assertEquals(clientCache, srvDisco.cacheClientNode(clientNode, cacheName));
+
+ assertEquals(cacheExists, clientDisco.cacheAffinityNode(srvNode, cacheName));
+
+ if (clientNear)
+ assertTrue(clientDisco.cacheNearNode(clientNode, cacheName));
+ else
+ assertEquals(clientCache, clientDisco.cacheClientNode(clientNode, cacheName));
+
+ if (cacheExists) {
+ assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+ assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+ }
+ else {
+ assertTrue(client.cluster().forClientNodes(cacheName).nodes().isEmpty());
+ assertTrue(srv.cluster().forClientNodes(cacheName).nodes().isEmpty());
+ }
+ }
+
+ /**
*
*/
private static class TestCommunicationSpi extends TcpCommunicationSpi {