You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/02 16:38:46 UTC

incubator-ignite git commit: # ignite-901 client reconnect WIP

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 f5f3efd16 -> ff3d61fcc


# ignite-901 client reconnect WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ff3d61fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ff3d61fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ff3d61fc

Branch: refs/heads/ignite-901
Commit: ff3d61fcc6ac310c247e2a5c91d91495f3f66a04
Parents: f5f3efd
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 2 13:18:25 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 2 17:38:41 2015 +0300

----------------------------------------------------------------------
 .../IgniteClientDisconnectedException.java      |  33 ++
 .../apache/ignite/internal/GridComponent.java   |   1 +
 ...gniteClientDisconnectedCheckedException.java |  32 ++
 .../IgniteDisconnectedCheckedException.java     |  32 --
 .../apache/ignite/internal/IgniteKernal.java    |  65 ----
 .../processors/cache/GridCacheMvccManager.java  |   2 +-
 .../GridCachePartitionExchangeManager.java      |   6 +-
 .../continuous/GridContinuousHandler.java       |   9 +-
 .../continuous/GridContinuousProcessor.java     |  23 +-
 .../datastructures/DataStructuresProcessor.java |   7 +-
 .../GridCacheCountDownLatchImpl.java            |  49 ++-
 .../service/GridServiceProcessor.java           |   8 +-
 .../processors/service/GridServiceProxy.java    |  10 +-
 .../ignite/internal/util/IgniteUtils.java       |   6 +
 .../spi/discovery/DiscoverySpiDataExchange.java |   3 +-
 .../IgniteClientReconnectAbstractTest.java      |  19 +-
 .../IgniteClientReconnectAtomicsTest.java       | 315 +++++++++++++++++
 .../IgniteClientReconnectCacheTest.java         |   9 +-
 .../IgniteClientReconnectCollectionsTest.java   | 132 +++++++
 .../IgniteClientReconnectComputeTest.java       |  35 ++
 ...eClientReconnectContinuousProcessorTest.java | 350 +++++++++++++++++++
 .../IgniteClientReconnectTestSuite.java         |   4 +
 22 files changed, 1010 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
new file mode 100644
index 0000000..9500ac2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.jetbrains.annotations.*;
+
+/**
+ *
+ */
+public class IgniteClientDisconnectedException extends IgniteException {
+    /**
+     * @param msg Error message.
+     * @param cause Optional nested exception (can be {@code null}).
+     */
+    public IgniteClientDisconnectedException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 5b3b0c3..7f0f1b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -87,6 +87,7 @@ public interface GridComponent {
     /**
      * Receives discovery data object from remote nodes (called
      * on new node during discovery process).
+     *
      * @param joiningNodeId Joining node ID.
      * @param rmtNodeId Remote node ID for which data is provided.
      * @param data Discovery data object or {@code null} if nothing was

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java
new file mode 100644
index 0000000..2e999f4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+
+/**
+ *
+ */
+public class IgniteClientDisconnectedCheckedException extends IgniteCheckedException {
+    /**
+     * @param msg Message.
+     */
+    public IgniteClientDisconnectedCheckedException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
deleted file mode 100644
index 0684356..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-import org.apache.ignite.*;
-
-/**
- *
- */
-public class IgniteDisconnectedCheckedException extends IgniteCheckedException {
-    /**
-     * @param msg Message.
-     */
-    public IgniteDisconnectedCheckedException(String msg) {
-        super(msg);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 821a1f5..c04f327 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2816,71 +2816,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         }
     }
 
-    private void stopOnDisconnect() {
-        GridCacheProcessor cacheProcessor = ctx.cache();
-
-        List<GridComponent> comps = ctx.components();
-
-        // Callback component in reverse order while kernal is still functional
-        // if called in the same thread, at least.
-        for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) {
-            GridComponent comp = it.previous();
-
-            try {
-                if (!skipDaemon(comp) && (!(comp instanceof GridManager)))
-                    comp.onKernalStop(true);
-            }
-            catch (Throwable e) {
-                errOnStop = true;
-
-                U.error(log, "Failed to pre-stop processor: " + comp, e);
-
-                if (e instanceof Error)
-                    throw e;
-            }
-        }
-
-        if (cacheProcessor != null)
-            cacheProcessor.cancelUserOperations();
-
-        for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) {
-            GridComponent comp = it.previous();
-
-            try {
-                if (!skipDaemon(comp) && (!(comp instanceof GridManager))) {
-                    comp.stop(true);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Component stopped: " + comp);
-                }
-            }
-            catch (Throwable e) {
-                errOnStop = true;
-
-                U.error(log, "Failed to stop component (ignoring): " + comp, e);
-
-                if (e instanceof Error)
-                    throw (Error)e;
-            }
-        }
-
-        ctx.marshallerContext().onDisconnected();
-    }
-
-    private void restart() throws IgniteCheckedException {
-        List<PluginProvider> plugins = U.allPluginProviders();
-
-        startProcessor(new ClusterProcessor(ctx));
-
-        GridResourceProcessor rsrcProc = new GridResourceProcessor(ctx);
-
-        rsrcProc.setSpringContext(rsrcCtx);
-
-        scheduler = new IgniteSchedulerImpl(ctx);
-
-        startProcessor(rsrcProc);
-    }
-
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index e2d22dd..9c89041 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -299,7 +299,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     public void cancelClientFutures(boolean stop) {
         IgniteCheckedException e = stop ?
             new IgniteCheckedException("Operation has been cancelled (node is stopping).") :
-            new IgniteCheckedException("Operation has been cancelled (node disconnected).");
+            new IgniteClientDisconnectedCheckedException("Operation has been cancelled (node disconnected).");
 
         for (Collection<GridCacheFuture<?>> futures : futs.values()) {
             for (GridCacheFuture<?> future : futures)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f0c9b3b..3f06e8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -286,7 +286,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     break;
                 }
-                catch (IgniteDisconnectedCheckedException e) {
+                catch (IgniteClientDisconnectedCheckedException e) {
                     log.info("Disconnected while waiting for initial partition map exchange: " + e);
 
                     break;
@@ -331,7 +331,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
 
         IgniteCheckedException err = disconnected ?
-            new IgniteDisconnectedCheckedException("Node disconnected: " + cctx.gridName()) :
+            new IgniteClientDisconnectedCheckedException("Node disconnected: " + cctx.gridName()) :
             new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
 
         // Finish all exchange futures.
@@ -1119,7 +1119,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 catch (IgniteInterruptedCheckedException e) {
                     throw e;
                 }
-                catch (IgniteDisconnectedCheckedException e) {
+                catch (IgniteClientDisconnectedCheckedException e) {
                     return;
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index ce9b7c0..79020da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -33,7 +33,14 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
      * Listener registration status.
      */
     public enum RegisterStatus {
-        REGISTERED, NOT_REGISTERED, DELAYED
+        /** */
+        REGISTERED,
+
+        /** */
+        NOT_REGISTERED,
+
+        /** */
+        DELAYED
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index dd04bf4..7508acd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -318,11 +318,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
-        if (!nodeId.equals(ctx.localNodeId())) {
+        if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
             DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos);
 
-            // Collect listeners information (will be sent to
-            // joining node during discovery process).
+            // Collect listeners information (will be sent to joining node during discovery process).
             for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
                 UUID routineId = e.getKey();
                 LocalRoutineInfo info = e.getValue();
@@ -333,8 +332,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
             return data;
         }
-        else
-            return null;
+
+        return null;
     }
 
     /** {@inheritDoc} */
@@ -377,6 +376,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * Callback invoked when cache is started.
      *
      * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
      */
     public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException {
         for (Map.Entry<UUID, RemoteRoutineInfo> entry : rmtInfos.entrySet()) {
@@ -630,6 +630,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDisconnected() throws IgniteCheckedException {
+        for (UUID rmtId : rmtInfos.keySet())
+            unregisterRemote(rmtId);
+
+        rmtInfos.clear();
+
+        clientInfos.clear();
+    }
+
     /**
      * @param nodeId Node ID.
      * @param routineId Routine ID.
@@ -637,6 +647,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param toSnd Notification object to send.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param msg If {@code true} then sent data is collection of messages.
      * @throws IgniteCheckedException In case of error.
      */
     private void sendNotification(UUID nodeId,
@@ -1221,6 +1232,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         @GridToStringInclude
         private Collection<DiscoveryDataItem> items;
 
+        /** */
         private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos;
 
         /**
@@ -1232,6 +1244,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         /**
          * @param nodeId Node ID.
+         * @param clientInfos Client information.
          */
         DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
             assert nodeId != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index e6335b6..d4f67fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -1001,8 +1001,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                         dsView.put(key, val);
                     }
 
-                    latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(),
-                        val.autoDelete(), key, cntDownLatchView, dsCacheCtx);
+                    latch = new GridCacheCountDownLatchImpl(name, val.initialCount(),
+                        val.autoDelete(),
+                        key,
+                        cntDownLatchView,
+                        dsCacheCtx);
 
                     dsMap.put(key, latch);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 33547d9..6a0f5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -66,9 +66,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     /** Cache context. */
     private GridCacheContext ctx;
 
-    /** Current count. */
-    private int cnt;
-
     /** Initial count. */
     private int initCnt;
 
@@ -95,7 +92,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
      * Constructor.
      *
      * @param name Latch name.
-     * @param cnt Current count.
      * @param initCnt Initial count.
      * @param autoDel Auto delete flag.
      * @param key Latch key.
@@ -103,7 +99,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
      * @param ctx Cache context.
      */
     public GridCacheCountDownLatchImpl(String name,
-        int cnt,
         int initCnt,
         boolean autoDel,
         GridCacheInternalKey key,
@@ -111,14 +106,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
         GridCacheContext ctx)
     {
         assert name != null;
-        assert cnt >= 0;
         assert initCnt >= 0;
         assert key != null;
         assert latchView != null;
         assert ctx != null;
 
         this.name = name;
-        this.cnt = cnt;
         this.initCnt = initCnt;
         this.autoDel = autoDel;
         this.key = key;
@@ -135,7 +128,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
 
     /** {@inheritDoc} */
     @Override public int count() {
-        return cnt;
+        try {
+            return CU.outTx(new GetCountCallable(), ctx);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -211,8 +209,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
 
     /** {@inheritDoc} */
     @Override public boolean onRemoved() {
-        assert cnt == 0;
-
         return rmvd = true;
     }
 
@@ -235,8 +231,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     @Override public void onUpdate(int cnt) {
         assert cnt >= 0;
 
-        this.cnt = cnt;
-
         while (internalLatch != null && internalLatch.getCount() > cnt)
             internalLatch.countDown();
     }
@@ -257,9 +251,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
                                     if (log.isDebugEnabled())
                                         log.debug("Failed to find count down latch with given name: " + name);
 
-                                    assert cnt == 0;
-
-                                    return new CountDownLatch(cnt);
+                                    return new CountDownLatch(0);
                                 }
 
                                 tx.commit();
@@ -342,6 +334,29 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     /**
      *
      */
+    private class GetCountCallable implements Callable<Integer> {
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            Integer val;
+
+            try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+                GridCacheCountDownLatchValue latchVal = latchView.get(key);
+
+                if (latchVal == null)
+                    throw new IgniteCheckedException("Failed to find count down latch with given name: " + name);
+
+                val = latchVal.get();
+
+                tx.rollback();
+            }
+
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
     private class CountDownCallable implements Callable<Integer> {
         /** Value to count down on (if 0 then latch is counted down to 0). */
         private final int val;
@@ -364,9 +379,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
                     if (log.isDebugEnabled())
                         log.debug("Failed to find count down latch with given name: " + name);
 
-                    assert cnt == 0;
-
-                    return cnt;
+                    return 0;
                 }
 
                 int retVal;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index bb451c7..89b2a31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -124,7 +124,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         cache = ctx.cache().utilityCache();
 
-        ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY);
+        if (!ctx.clientNode())
+            ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY);
 
         try {
             if (ctx.deploy().enabled())
@@ -165,7 +166,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         busyLock.block();
 
-        ctx.event().removeLocalEventListener(topLsnr);
+        if (!ctx.clientNode())
+            ctx.event().removeLocalEventListener(topLsnr);
 
         if (cfgQryId != null)
             cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
@@ -1164,7 +1166,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                                 }
                             }
                             catch (IgniteCheckedException ex) {
-                                log.error("Failed to clean up zombie assignments for service: " + name, ex);
+                                U.error(log, "Failed to clean up zombie assignments for service: " + name, ex);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index 8e13bc4..67ddc6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -68,9 +68,15 @@ class GridServiceProxy<T> implements Serializable {
      * @param name Service name.
      * @param svc Service type class.
      * @param sticky Whether multi-node request should be done.
+     * @param ctx Context.
      */
-    @SuppressWarnings("unchecked") GridServiceProxy(ClusterGroup prj, String name, Class<? super T> svc,
-        boolean sticky, GridKernalContext ctx) {
+    @SuppressWarnings("unchecked")
+    GridServiceProxy(ClusterGroup prj,
+        String name,
+        Class<? super T> svc,
+        boolean sticky,
+        GridKernalContext ctx)
+    {
         this.prj = prj;
         this.ctx = ctx;
         hasLocNode = hasLocalNode(prj);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f457d6c..7516f79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -626,6 +626,12 @@ public abstract class IgniteUtils {
             }
         });
 
+        m.put(IgniteClientDisconnectedCheckedException.class, new C1<IgniteCheckedException, IgniteException>() {
+            @Override public IgniteException apply(IgniteCheckedException e) {
+                return new IgniteClientDisconnectedException(e.getMessage(), e);
+            }
+        });
+
         return m;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
index 46d6716..038ea59 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
@@ -39,7 +39,8 @@ public interface DiscoverySpiDataExchange {
     /**
      * Notifies discovery manager about data received from remote node.
      *
-     * @param joiningNodeId Remote node ID.
+     * @param joiningNodeId ID of new node that joins topology.
+     * @param nodeId ID of the node provided data.
      * @param data Collection of discovery data objects from different components.
      */
     public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 0512074..23b8a15 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -65,6 +65,13 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
     protected abstract int serverCount();
 
     /**
+     * @return Number of client nodes started before tests.
+     */
+    protected int clientCount() {
+        return 0;
+    }
+
+    /**
      * @param ignite Node.
      * @return Discovery SPI.
      */
@@ -79,7 +86,17 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         int srvs = serverCount();
 
         if (srvs > 0)
-            startGrids(srvs);
+            startGridsMultiThreaded(srvs);
+
+        int clients = clientCount();
+
+        if (clients > 0) {
+            clientMode = true;
+
+            startGridsMultiThreaded(srvs, clients);
+
+            clientMode = false;
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
new file mode 100644
index 0000000..bbb7eef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * TODO IGNITE-901: test AtomicReference, AtomicStamped, usage after remove, test API block, fail current call on disconnect.
+ */
+public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicLongReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true);
+
+        assertEquals(0L, clientAtomicLong.getAndAdd(1));
+
+        IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false);
+
+        assertEquals(1L, srvAtomicLong.getAndAdd(1));
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+        assertEquals(2L, srvAtomicLong.getAndAdd(1));
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        assertEquals(3L, clientAtomicLong.getAndAdd(1));
+
+        assertEquals(4L, srvAtomicLong.getAndAdd(1));
+
+        assertEquals(5L, clientAtomicLong.getAndAdd(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicLongReconnectRemoved() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true);
+
+        assertEquals(0L, clientAtomicLong.getAndAdd(1));
+
+        IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false);
+
+        assertEquals(1L, srvAtomicLong.getAndAdd(1));
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+        srvAtomicLong.close();
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                clientAtomicLong.getAndAdd(1);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLatchReconnect1() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true);
+
+        assertEquals(3, clientLatch.count());
+
+        IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false);
+
+        assertEquals(3, srvLatch.count());
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override
+            public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+        srvLatch.countDown();
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        assertEquals(2, srvLatch.count());
+        assertEquals(2, clientLatch.count());
+
+        srvLatch.countDown();
+
+        assertEquals(1, srvLatch.count());
+        assertEquals(1, clientLatch.count());
+
+        clientLatch.countDown();
+
+        assertEquals(0, srvLatch.count());
+        assertEquals(0, clientLatch.count());
+
+        assertTrue(srvLatch.await(1000));
+        assertTrue(clientLatch.await(1000));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testLatchReconnect2() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final IgniteCountDownLatch clientLatch = client.countDownLatch("latch2", 1, false, true);
+
+        IgniteCountDownLatch srvLatch = srv.countDownLatch("latch2", 1, false, false);
+
+        assertFalse(clientLatch.await(100));
+
+        IgniteInternalFuture<Boolean> waitFut = GridTestUtils.runAsync(new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                return clientLatch.await(60_000, MILLISECONDS);
+            }
+        });
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+        srvLatch.countDown();
+
+        assertNotDone(waitFut);
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        assertTrue(waitFut.get(5000));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 5687010..452f808 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -129,8 +129,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
                     assertEquals(1, reconnectLatch.getCount());
 
                     blockPutRef.set(GridTestUtils.runAsync(new Callable() {
-                        @Override
-                        public Object call() throws Exception {
+                        @Override public Object call() throws Exception {
                             log.info("Start put.");
 
                             cache.put(2, 2);
@@ -218,8 +217,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
         final CountDownLatch reconnectLatch = new CountDownLatch(1);
 
         client.events().localListen(new IgnitePredicate<Event>() {
-            @Override
-            public boolean apply(Event evt) {
+            @Override public boolean apply(Event evt) {
                 if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
                     info("Reconnected: " + evt);
 
@@ -305,8 +303,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
         }, new int[]{EVT_CLIENT_NODE_RECONNECTED});
 
         IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
-            @Override
-            public Ignite call() throws Exception {
+            @Override public Ignite call() throws Exception {
                 try {
                     return startGrid(SRV_CNT);
                 } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
new file mode 100644
index 0000000..fcb74cd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * TODO IGNITE-901: test for queue, check set/queue usage after remove, test API block, fail current call on disconnect.
+ */
+public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSetReconnect() throws Exception {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(TRANSACTIONAL);
+
+        setReconnect(colCfg);
+
+        colCfg = new CollectionConfiguration();
+
+        colCfg.setCacheMode(PARTITIONED);
+        colCfg.setAtomicityMode(ATOMIC);
+
+        setReconnect(colCfg);
+    }
+
+    /**
+     * @param colCfg Collection configuration.
+     * @throws Exception If failed.
+     */
+    private void setReconnect(CollectionConfiguration colCfg) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final String setName = "set-" + colCfg.getAtomicityMode();
+
+        IgniteSet<String> clientSet = client.set(setName, colCfg);
+
+        IgniteSet<String> srvSet = srv.set(setName, null);
+
+        assertTrue(clientSet.add("1"));
+
+        assertFalse(srvSet.add("1"));
+
+        final CountDownLatch disconnectLatch = new CountDownLatch(1);
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        final TestTcpDiscoverySpi clientSpi = spi(client);
+
+        log.info("Block reconnect.");
+
+        clientSpi.writeLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    disconnectLatch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+
+        assertTrue(srvSet.add("2"));
+
+        log.info("Allow reconnect.");
+
+        clientSpi.writeLatch.countDown();
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        assertFalse(clientSet.add("2"));
+
+        assertTrue(clientSet.remove("2"));
+
+        assertFalse(srvSet.contains("2"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
new file mode 100644
index 0000000..01eb2ca
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ *
+ */
+public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectOperationInProgress() throws Exception {
+        // TODO IGNITE-901.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
new file mode 100644
index 0000000..bf0130b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+
+import javax.cache.event.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    private static volatile CountDownLatch latch;
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEventListenerReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        EventListener lsnr = new EventListener();
+
+        UUID opId = client.events().remoteListen(lsnr, null, EventType.EVT_JOB_STARTED);
+
+        lsnr.latch = new CountDownLatch(1);
+
+        log.info("Created remote listener: " + opId);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        client.compute().run(new DummyJob());
+
+        assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+        lsnr.latch = new CountDownLatch(1);
+
+        srv.compute().run(new DummyJob());
+
+        assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+        lsnr.latch = new CountDownLatch(1);
+
+        log.info("Stop listen, should not get events anymore.");
+
+        client.events().stopRemoteListen(opId);
+
+        assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMessageListenerReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final String topic = "testTopic";
+
+        MessageListener locLsnr  = new MessageListener();
+
+        UUID opId = client.message().remoteListen(topic, new RemoteMessageListener());
+
+        client.message().localListen(topic, locLsnr);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        locLsnr.latch = new CountDownLatch(1);
+        latch = new CountDownLatch(2);
+
+        client.message().send(topic, "msg1");
+
+        assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+        assertTrue(latch.await(5000, MILLISECONDS));
+
+        locLsnr.latch = new CountDownLatch(1);
+        latch = new CountDownLatch(2);
+
+        srv.message().send(topic, "msg2");
+
+        assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+        assertTrue(latch.await(5000, MILLISECONDS));
+
+        log.info("Stop listen, should not get remote messages anymore.");
+
+        client.message().stopRemoteListen(opId);
+
+        srv.message().send(topic, "msg3");
+
+        locLsnr.latch = new CountDownLatch(1);
+        latch = new CountDownLatch(1);
+
+        assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+        assertFalse(latch.await(3000, MILLISECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheContinuousQueryReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = clientCache.query(qry);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        lsnr.latch = new CountDownLatch(1);
+
+        clientCache.put(1, 1);
+
+        assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+        lsnr.latch = new CountDownLatch(1);
+
+        srv.cache(null).put(2, 2);
+
+        assertTrue(lsnr.latch.await(5000, MILLISECONDS));
+
+        log.info("Close cursor, should not get cache events anymore.");
+
+        cur.close();
+
+        lsnr.latch = new CountDownLatch(1);
+
+        clientCache.put(3, 3);
+
+        assertFalse(lsnr.latch.await(3000, MILLISECONDS));
+    }
+
+    // TODO IGNITE-901 test operations in progress are cancelled.
+
+    /**
+     *
+     */
+    private static class EventListener implements P2<UUID, Event> {
+        /** */
+        private volatile CountDownLatch latch;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, Event evt) {
+            assertTrue(ignite.cluster().localNode().isClient());
+
+            ignite.log().info("Received event: " + evt);
+
+            if (latch != null)
+                latch.countDown();
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MessageListener implements P2<UUID, Object> {
+        /** */
+        private volatile CountDownLatch latch;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, Object msg) {
+            assertTrue(ignite.cluster().localNode().isClient());
+
+            ignite.log().info("Local listener received message: " + msg);
+
+            if (latch != null)
+                latch.countDown();
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoteMessageListener implements P2<UUID, Object> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, Object msg) {
+            ignite.log().info("Remote listener received message: " + msg);
+
+            if (latch != null)
+                latch.countDown();
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+        /** */
+        private volatile CountDownLatch latch;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            int cnt = 0;
+
+            for (CacheEntryEvent<?, ?> evt : evts) {
+                ignite.log().info("Received cache event: " + evt);
+
+                cnt++;
+            }
+
+            assertEquals(1, cnt);
+
+            if (latch != null)
+                latch.countDown();
+        }
+    }
+
+    /**
+     *
+     */
+    static class DummyJob implements IgniteRunnable {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            ignite.log().info("Job run.");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index 7533a2c..0f5b3ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -34,6 +34,10 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
         suite.addTestSuite(IgniteClientReconnectApiBlockTest.class);
         suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class);
         suite.addTestSuite(IgniteClientReconnectCacheTest.class);
+        suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class);
+        suite.addTestSuite(IgniteClientReconnectComputeTest.class);
+        suite.addTestSuite(IgniteClientReconnectAtomicsTest.class);
+        suite.addTestSuite(IgniteClientReconnectCollectionsTest.class);
 
         return suite;
     }