You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/12/09 12:24:10 UTC
[28/51] [abbrv] ignite git commit: ignite-1.5 Fixed hang on client
reconnect (should not do blocking calls from reconnect callback)
ignite-1.5 Fixed hang on client reconnect (should not do blocking calls from reconnect callback)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5791837
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5791837
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5791837
Branch: refs/heads/ignite-843-rc2
Commit: d5791837890a70e1777b86aab281245701afe1eb
Parents: 3b26859
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 8 12:42:25 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 8 12:42:25 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/GridComponent.java | 3 +-
.../ignite/internal/GridPluginComponent.java | 4 +-
.../apache/ignite/internal/IgniteKernal.java | 18 +++-
.../internal/managers/GridManagerAdapter.java | 5 +-
.../deployment/GridDeploymentManager.java | 5 +-
.../processors/GridProcessorAdapter.java | 5 +-
.../processors/cache/GridCacheContext.java | 6 +-
.../processors/cache/GridCacheProcessor.java | 26 +++++-
.../datastructures/DataStructuresProcessor.java | 4 +-
.../IgniteClientReconnectAbstractTest.java | 95 +++++++++++++++++---
.../IgniteClientReconnectAtomicsTest.java | 57 ++++++++++++
.../IgniteClientReconnectCacheTest.java | 5 +-
.../IgniteClientReconnectCollectionsTest.java | 51 +++++++++++
13 files changed, 254 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 6078c5d..0e234cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -131,6 +131,7 @@ public interface GridComponent {
*
* @param clusterRestarted Cluster restarted flag.
* @throws IgniteCheckedException If failed.
+ * @return Future to wait before completing reconnect future.
*/
- public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException;
+ @Nullable public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index ac2a3a7..89dc243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -70,8 +70,8 @@ public class GridPluginComponent implements GridComponent {
}
/** {@inheritDoc} */
- @Override public void onReconnected(boolean clusterRestarted) {
- // No-op.
+ @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) {
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 87ccf93..ab62c13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -136,6 +136,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridTimerTask;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
@@ -3083,16 +3084,27 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/**
* @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected.
*/
+ @SuppressWarnings("unchecked")
public void onReconnected(final boolean clusterRestarted) {
Throwable err = null;
try {
ctx.disconnected(false);
- for (GridComponent comp : ctx.components())
- comp.onReconnected(clusterRestarted);
+ GridCompoundFuture<?, ?> reconnectFut = new GridCompoundFuture<>();
+
+ for (GridComponent comp : ctx.components()) {
+ IgniteInternalFuture<?> fut = comp.onReconnected(clusterRestarted);
+
+ if (fut != null)
+ reconnectFut.add((IgniteInternalFuture)fut);
+ }
+
+ reconnectFut.add((IgniteInternalFuture)ctx.cache().context().exchange().reconnectExchangeFuture());
+
+ reconnectFut.markInitialized();
- ctx.cache().context().exchange().reconnectExchangeFuture().listen(new CI1<IgniteInternalFuture<?>>() {
+ reconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
fut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 1fd5bff..21a80c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
@@ -192,9 +193,11 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
}
/** {@inheritDoc} */
- @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
for (T t : spis)
t.onClientReconnected(clusterRestarted);
+
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
index a2da75c..cea1786 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskName;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.deployment.protocol.gg.GridProtocolHandler;
import org.apache.ignite.internal.processors.task.GridInternal;
@@ -123,8 +124,10 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
}
/** {@inheritDoc} */
- @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
storesOnKernalStart();
+
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index f7f42bd..e4896fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteFuture;
@@ -68,8 +69,8 @@ public abstract class GridProcessorAdapter implements GridProcessor {
}
/** {@inheritDoc} */
- @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
- // No-op.
+ @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index d689ba6..07f6b9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -242,7 +242,7 @@ public class GridCacheContext<K, V> implements Externalizable {
private boolean depEnabled;
/** */
- private boolean deferredDelete;
+ private boolean deferredDel;
/**
* Empty constructor required for {@link Externalizable}.
@@ -512,7 +512,7 @@ public class GridCacheContext<K, V> implements Externalizable {
public void cache(GridCacheAdapter<K, V> cache) {
this.cache = cache;
- deferredDelete = cache.isDht() || cache.isDhtAtomic() || cache.isColocated() ||
+ deferredDel = cache.isDht() || cache.isDhtAtomic() || cache.isColocated() ||
(cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC);
}
@@ -576,7 +576,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return {@code True} if entries should not be deleted from cache immediately.
*/
public boolean deferredDelete() {
- return deferredDelete;
+ return deferredDel;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e53f186..02e6403 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -96,6 +96,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.F0;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -955,10 +956,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size());
- for (GridCacheAdapter cache : caches.values()) {
+ GridCompoundFuture<?, ?> stopFut = null;
+
+ for (final GridCacheAdapter cache : caches.values()) {
String name = cache.name();
boolean stopped;
@@ -985,8 +988,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
caches.remove(maskNull(cache.name()));
jCacheProxies.remove(maskNull(cache.name()));
- onKernalStop(cache, true);
- stopCache(cache, true);
+ IgniteInternalFuture<?> fut = ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ onKernalStop(cache, true);
+ stopCache(cache, true);
+ }
+ });
+
+ if (stopFut == null)
+ stopFut = new GridCompoundFuture<>();
+
+ stopFut.add((IgniteInternalFuture)fut);
}
else {
cache.onReconnected();
@@ -1008,6 +1020,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cache.context().gate().reconnected(false);
cachesOnDisconnect = null;
+
+ if (stopFut != null)
+ stopFut.markInitialized();
+
+ return stopFut;
}
/**
@@ -1200,6 +1217,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param pluginMgr Cache plugin manager.
* @param cacheType Cache type.
* @param cacheObjCtx Cache object context.
+ * @param updatesAllowed Updates allowed flag.
* @return Cache context.
* @throws IgniteCheckedException If failed to create cache.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 9ed9350..51c4067 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -276,7 +276,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
for (Map.Entry<GridCacheInternal, GridCacheRemovable> e : dsMap.entrySet()) {
GridCacheRemovable obj = e.getValue();
@@ -291,6 +291,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
for (GridCacheContext cctx : ctx.cache().context().cacheContexts())
cctx.dataStructures().onReconnected(clusterRestarted);
+
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 0c1df7f..180047a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal;
import java.io.IOException;
import java.net.Socket;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,6 +101,14 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
* @throws Exception If failed.
*/
protected void waitReconnectEvent(CountDownLatch latch) throws Exception {
+ waitReconnectEvent(log, latch);
+ }
+
+ /**
+ * @param latch Latch.
+ * @throws Exception If failed.
+ */
+ protected static void waitReconnectEvent(IgniteLogger log, CountDownLatch latch) throws Exception {
if (!latch.await(RECONNECT_TIMEOUT, MILLISECONDS)) {
log.error("Failed to wait for reconnect event, will dump threads, latch count: " + latch.getCount());
@@ -124,7 +134,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
* @param ignite Node.
* @return Discovery SPI.
*/
- protected TestTcpDiscoverySpi spi(Ignite ignite) {
+ protected static TestTcpDiscoverySpi spi(Ignite ignite) {
return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
}
@@ -201,18 +211,38 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
*/
protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC)
throws Exception {
- reconnectClientNodes(Collections.singletonList(client), srv, disconnectedC);
+ reconnectClientNodes(log, Collections.singletonList(client), srv, disconnectedC);
}
/**
* Reconnect client node.
*
+ * @param log Logger.
+ * @param client Client.
+ * @param srv Server.
+ * @param disconnectedC Closure which will be run when client node disconnected.
+ * @throws Exception If failed.
+ */
+ public static void reconnectClientNode(IgniteLogger log,
+ Ignite client,
+ Ignite srv,
+ @Nullable Runnable disconnectedC)
+ throws Exception {
+ reconnectClientNodes(log, Collections.singletonList(client), srv, disconnectedC);
+ }
+
+ /**
+ * Reconnect client node.
+ *
+ * @param log Logger.
* @param clients Clients.
* @param srv Server.
* @param disconnectedC Closure which will be run when client node disconnected.
* @throws Exception If failed.
*/
- protected void reconnectClientNodes(List<Ignite> clients, Ignite srv, @Nullable Runnable disconnectedC)
+ protected static void reconnectClientNodes(final IgniteLogger log,
+ List<Ignite> clients, Ignite srv,
+ @Nullable Runnable disconnectedC)
throws Exception {
final TestTcpDiscoverySpi srvSpi = spi(srv);
@@ -227,12 +257,12 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- info("Disconnected: " + evt);
+ log.info("Disconnected: " + evt);
disconnectLatch.countDown();
}
else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
+ log.info("Reconnected: " + evt);
reconnectLatch.countDown();
}
@@ -247,7 +277,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
for (Ignite client : clients)
srvSpi.failNode(client.cluster().localNode().id(), null);
- waitReconnectEvent(disconnectLatch);
+ waitReconnectEvent(log, disconnectLatch);
if (disconnectedC != null)
disconnectedC.run();
@@ -257,13 +287,58 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
for (Ignite client : clients)
spi(client).writeLatch.countDown();
- waitReconnectEvent(reconnectLatch);
+ waitReconnectEvent(log, reconnectLatch);
for (Ignite client : clients)
client.events().stopLocalListen(p);
}
/**
+ * @param log Logger.
+ * @param client Client node.
+ * @param srvs Server nodes to stop.
+ * @param srvStartC Closure starting server nodes.
+ * @throws Exception If failed.
+ * @return Restarted servers.
+ */
+ public static Collection<Ignite> reconnectServersRestart(final IgniteLogger log,
+ Ignite client,
+ Collection<Ignite> srvs,
+ Callable<Collection<Ignite>> srvStartC)
+ throws Exception {
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ log.info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ log.info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ for (Ignite srv : srvs)
+ srv.close();
+
+ assertTrue(disconnectLatch.await(30_000, MILLISECONDS));
+
+ Collection<Ignite> startedSrvs = srvStartC.call();
+
+ assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+ return startedSrvs;
+ }
+
+ /**
* @param e Client disconnected exception.
* @return Reconnect future.
*/
@@ -303,7 +378,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
/**
*
*/
- protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ public static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
volatile CountDownLatch writeLatch;
@@ -342,7 +417,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
private IgniteLogger log;
/** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
Class msgCls0 = msgCls;
@@ -356,7 +431,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
return;
}
- super.sendMessage(node, msg, ackClosure);
+ super.sendMessage(node, msg, ackC);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index c46b5c8..13cac81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal;
+import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
@@ -47,6 +49,61 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
/**
* @throws Exception If failed.
*/
+ public void testAtomicsReconnectClusterRestart() throws Exception {
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final IgniteAtomicLong atomicLong = client.atomicLong("atomicLong", 1L, true);
+ final IgniteAtomicReference<Integer> atomicRef = client.atomicReference("atomicRef", 1, true);
+ final IgniteAtomicStamped<Integer, Integer> atomicStamped = client.atomicStamped("atomicStamped", 1, 1, true);
+ final IgniteCountDownLatch latch = client.countDownLatch("latch", 1, true, true);
+ final IgniteAtomicSequence seq = client.atomicSequence("seq", 1L, true);
+
+ Ignite srv = grid(0);
+
+ reconnectServersRestart(log, client, Collections.singleton(srv), new Callable<Collection<Ignite>>() {
+ @Override public Collection<Ignite> call() throws Exception {
+ return Collections.singleton((Ignite)startGrid(0));
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ atomicStamped.compareAndSet(1, 1, 2, 2);
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ atomicRef.compareAndSet(1, 2);
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ atomicLong.incrementAndGet();
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ seq.getAndAdd(1L);
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAtomicSeqReconnect() throws Exception {
Ignite client = grid(serverCount());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 14a770a..05da0b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -971,7 +971,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
info("Disconnected: " + evt);
disconnectLatch.countDown();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
info("Reconnected: " + evt);
reconnectLatch.countDown();
@@ -1096,7 +1097,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
for (int iter = 0; iter < 3; iter++) {
log.info("Iteration: " + iter);
- reconnectClientNodes(clients, grid(0), null);
+ reconnectClientNodes(log, clients, grid(0), null);
for (Ignite client : clients) {
IgniteCache<Object, Object> cache = client.cache(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5791837/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index f6f038d..100e8de 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal;
+import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
@@ -49,6 +51,55 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
/**
* @throws Exception If failed.
*/
+ public void testCollectionsReconnectClusterRestart() throws Exception {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setCacheMode(PARTITIONED);
+ colCfg.setAtomicityMode(TRANSACTIONAL);
+
+ Ignite client = grid(serverCount());
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final IgniteQueue<Object> queue = client.queue("q", 0, colCfg);
+ final IgniteSet<Object> set = client.set("s", colCfg);
+
+ Ignite srv = grid(0);
+
+ reconnectServersRestart(log, client, Collections.singleton(srv), new Callable<Collection<Ignite>>() {
+ @Override public Collection<Ignite> call() throws Exception {
+ return Collections.singleton((Ignite)startGrid(0));
+ }
+ });
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ queue.add(1);
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ set.add(1);
+
+ return null;
+ }
+ }, IllegalStateException.class, null);
+
+ try (IgniteQueue<Object> queue2 = client.queue("q", 0, colCfg)) {
+ queue2.add(1);
+ }
+
+ try (IgniteSet<Object> set2 = client.set("s", colCfg)) {
+ set2.add(1);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testQueueReconnect() throws Exception {
CollectionConfiguration colCfg = new CollectionConfiguration();