You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/08 17:19:11 UTC

[1/2] incubator-ignite git commit: # ignite-901

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 a3318e3ee -> 86d963f98


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
new file mode 100644
index 0000000..b55ac57
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectCacheQueriesFailoverTest extends IgniteClientReconnectFailoverAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setIndexedTypes(Integer.class, Person.class);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectCacheQueries() throws Exception {
+        final Ignite client = grid(serverCount());
+
+        final IgniteCache<Integer, Person> cache = client.cache(null);
+
+        assertNotNull(cache);
+
+        for (int i = 0; i <= 10_000; i++)
+            cache.put(i, new Person(i, "name-" + i));
+
+        reconnectFailover(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                SqlQuery<Integer, Person> sqlQry = new SqlQuery<>(Person.class, "where id > 1");
+
+                assertEquals(9999, cache.query(sqlQry).getAll().size());
+
+                SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select avg(p.id) from Person p");
+
+                List<List<?>> res = cache.query(fieldsQry).getAll();
+
+                assertEquals(1, res.size());
+
+                Double avg = (Double)res.get(0).get(0);
+
+                assertEquals(5_000, avg.intValue());
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     *
+     */
+    public static class Person {
+        /** */
+        @QuerySqlField
+        public int id;
+
+        /** */
+        @QuerySqlField
+        public String name;
+
+        /**
+         * @param id Id.
+         * @param name Name.
+         */
+        public Person(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * @return Id.
+         */
+        public int getId() {
+            return id;
+        }
+
+        /**
+         * @param id Set id.
+         */
+        public void setId(int id) {
+            this.id = id;
+        }
+
+        /**
+         * @return Name.
+         */
+        public String getName() {
+            return name;
+        }
+
+        /**
+         * @param name Name.
+         */
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
new file mode 100644
index 0000000..bca7f3c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    public static final String QUERY_CACHE = "query";
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer, Person>(QUERY_CACHE)
+            .setCacheMode(PARTITIONED)
+            .setAtomicityMode(ATOMIC)
+            .setBackups(1)
+            .setIndexedTypes(Integer.class, Person.class);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        grid(0).getOrCreateCache(QUERY_CACHE).removeAll();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryReconnect() throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
+
+        clnCache.put(1, new Person(1, "name1", "surname1"));
+        clnCache.put(2, new Person(2, "name2", "surname2"));
+        clnCache.put(3, new Person(3, "name3", "surname3"));
+
+        final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0");
+
+        qry.setPageSize(1);
+
+        QueryCursor<Cache.Entry<Integer, Person>> cur = clnCache.query(qry);
+
+        reconnectClientNode(cln, srv, new Runnable() {
+            @Override public void run() {
+                srvCache.put(4, new Person(4, "name4", "surname4"));
+
+                try {
+                    clnCache.query(qry);
+
+                    fail();
+                } catch (CacheException e) {
+                    check(e);
+                }
+            }
+        });
+
+        List<Cache.Entry<Integer, Person>> res = cur.getAll();
+
+        assertNotNull(res);
+        assertEquals(4, res.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectQueryInProgress() throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        clnCache.put(1, new Person(1, "name1", "surname1"));
+        clnCache.put(2, new Person(2, "name2", "surname2"));
+        clnCache.put(3, new Person(3, "name3", "surname3"));
+
+        blockMessage(GridQueryNextPageResponse.class);
+
+        final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0");
+
+        qry.setPageSize(1);
+
+        final QueryCursor<Cache.Entry<Integer, Person>> cur1 = clnCache.query(qry);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    cur1.getAll();
+                }
+                catch (CacheException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        unblockMessage();
+
+        reconnectClientNode(cln, srv, null);
+
+        assertTrue((Boolean) fut.get(2, SECONDS));
+
+        QueryCursor<Cache.Entry<Integer, Person>> cur2 = clnCache.query(qry);
+
+        assertEquals(3, cur2.getAll().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryReconnect() throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
+
+        for (int i = 0; i < 100; i++)
+            clnCache.put(i, new Person(i, "name-" + i, "surname-" + i));
+
+        final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
+
+        scanQry.setPageSize(1);
+
+        scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
+            @Override public boolean apply(Integer integer, Person person) {
+                return true;
+            }
+        });
+
+        QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
+
+        reconnectClientNode(cln, srv, new Runnable() {
+            @Override public void run() {
+                srvCache.put(1000, new Person(1000, "name", "surname"));
+
+                try {
+                    clnCache.query(scanQry);
+
+                    fail();
+                }
+                catch (CacheException e) {
+                    check(e);
+                }
+            }
+        });
+
+        try {
+            qryCursor.getAll();
+
+            fail();
+        }
+        catch (CacheException e) {
+            checkAndWait(e);
+        }
+
+        qryCursor = clnCache.query(scanQry);
+
+        assertEquals(101, qryCursor.getAll().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryReconnectInProgress1() throws Exception {
+        scanQueryReconnectInProgress(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryReconnectInProgress2() throws Exception {
+        scanQueryReconnectInProgress(true);
+    }
+
+    /**
+     * @param setPart If {@code true} sets partition for scan query.
+     * @throws Exception If failed.
+     */
+    private void scanQueryReconnectInProgress(boolean setPart) throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        clnCache.put(1, new Person(1, "name1", "surname1"));
+        clnCache.put(2, new Person(2, "name2", "surname2"));
+        clnCache.put(3, new Person(3, "name3", "surname3"));
+
+        final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
+
+        scanQry.setPageSize(1);
+
+        scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
+            @Override public boolean apply(Integer integer, Person person) {
+                return true;
+            }
+        });
+
+        if (setPart)
+            scanQry.setPartition(1);
+
+        blockMessage(GridCacheQueryResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
+
+                    qryCursor.getAll();
+                }
+                catch (CacheException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        unblockMessage();
+
+        reconnectClientNode(cln, srv, null);
+
+        assertTrue((Boolean)fut.get(2, SECONDS));
+
+        QueryCursor<Cache.Entry<Integer, Person>> qryCursor2 = clnCache.query(scanQry);
+
+        assertEquals(setPart ? 1 : 3, qryCursor2.getAll().size());
+    }
+
+    /**
+     * @param clazz Message class.
+     */
+    private void blockMessage(Class<?> clazz) {
+        for (int i = 0; i < serverCount(); i++) {
+            BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
+
+            commSpi.blockMessage(clazz);
+        }
+    }
+
+    /**
+     *
+     */
+    private void unblockMessage() {
+        for (int i = 0; i < serverCount(); i++) {
+            BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
+
+            commSpi.unblockMessage();
+        }
+    }
+
+    /**
+     *
+     */
+    public static class Person {
+        /** */
+        @QuerySqlField
+        public int id;
+
+        /** */
+        @QuerySqlField
+        public String name;
+
+        /** */
+        @QuerySqlField
+        public String surname;
+
+        /**
+         * @param id Id.
+         * @param name Name.
+         * @param surname Surname.
+         */
+        public Person(int id, String name, String surname) {
+            this.id = id;
+            this.name = name;
+            this.surname = surname;
+        }
+
+        /**
+         * @return Id.
+         */
+        public int getId() {
+            return id;
+        }
+
+        /**
+         * @param id Set id.
+         */
+        public void setId(int id) {
+            this.id = id;
+        }
+
+        /**
+         * @return Name.
+         */
+        public String getName() {
+            return name;
+        }
+
+        /**
+         * @param name Name.
+         */
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        /**
+         * @return Surname.
+         */
+        public String getSurname() {
+            return surname;
+        }
+
+        /**
+         * @param surname Surname.
+         */
+        public void setSurname(String surname) {
+            this.surname = surname;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
index 8ab2485..ca80acf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
@@ -375,6 +375,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
         /** */
         private int cnt;
 
+        /** {@inheritDoc} */
         @Override public boolean collect(List<?> e) {
             sum += (Integer)e.get(0);
 
@@ -383,6 +384,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
             return true;
         }
 
+        /** {@inheritDoc} */
         @Override public IgniteBiTuple<Integer, Integer> reduce() {
             return F.t(sum, cnt);
         }
@@ -398,6 +400,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
         /** */
         private int cnt;
 
+        /** {@inheritDoc} */
         @Override public boolean collect(IgniteBiTuple<Integer, Integer> t) {
             sum += t.get1();
             cnt += t.get2();
@@ -405,6 +408,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
             return true;
         }
 
+        /** {@inheritDoc} */
         @Override public Integer reduce() {
             return cnt == 0 ? 0 : sum / cnt;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 72ba15d..ce59995 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -126,9 +126,6 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         //Unmarshallig query test.
         suite.addTestSuite(IgniteCacheP2pUnmarshallingQueryErrorTest.class);
 
-        // Reconnect client query test.
-        suite.addTestSuite(IgniteClientReconnectQueriesTest.class);
-
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index 67ebda9..cc01540 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -52,6 +52,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
         suite.addTestSuite(CacheConfigurationP2PTest.class);
 
         suite.addTestSuite(IgniteCacheConfigurationPrimitiveTypesSelfTest.class);
+        suite.addTestSuite(IgniteClientReconnectQueriesTest.class);
 
         return suite;
     }


[2/2] incubator-ignite git commit: # ignite-901

Posted by sb...@apache.org.
# ignite-901


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/86d963f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/86d963f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/86d963f9

Branch: refs/heads/ignite-901
Commit: 86d963f98f3d3db33effdc482654e86d5b02bc52
Parents: a3318e3
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 8 10:08:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 8 18:19:00 2015 +0300

----------------------------------------------------------------------
 .../IgniteClientDisconnectedException.java      |  10 +-
 .../ignite/internal/GridJobSiblingImpl.java     |   2 +-
 .../ignite/internal/GridKernalGatewayImpl.java  |   4 +-
 .../apache/ignite/internal/IgniteKernal.java    |   3 +
 .../internal/cluster/IgniteClusterImpl.java     |   3 +
 .../internal/managers/GridManagerAdapter.java   |   7 +-
 .../deployment/GridDeploymentCommunication.java |   2 +-
 .../deployment/GridDeploymentManager.java       |  11 +-
 .../discovery/GridDiscoveryManager.java         |  64 ++-
 .../processors/cache/GridCacheGateway.java      |   3 +-
 .../processors/cache/GridCacheUtils.java        |   5 +-
 .../processors/cache/IgniteCacheFutureImpl.java |   5 +
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../clock/GridClockSyncProcessor.java           |   2 +-
 .../datastreamer/DataStreamerImpl.java          |  17 +-
 .../GridCacheCountDownLatchImpl.java            |   2 +-
 .../processors/job/GridJobProcessor.java        |   2 +-
 .../internal/processors/job/GridJobWorker.java  |   2 +-
 .../processors/task/GridTaskProcessor.java      |  41 +-
 .../processors/task/GridTaskWorker.java         |  59 ++-
 .../ignite/internal/util/IgniteUtils.java       |  19 +
 .../communication/tcp/TcpCommunicationSpi.java  |  16 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  90 ++--
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  12 +
 .../IgniteClientReconnectAbstractTest.java      |   3 +
 ...niteClientReconnectFailoverAbstractTest.java | 228 ++++++++++
 .../IgniteClientReconnectFailoverTest.java      | 167 +-------
 .../IgniteSlowClientDetectionSelfTest.java      |   1 +
 .../cache/IgniteCacheDynamicStopSelfTest.java   |   6 +-
 .../IgniteTxExceptionAbstractSelfTest.java      |   1 +
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  21 +
 .../h2/twostep/GridReduceQueryExecutor.java     |  16 +-
 .../IgniteClientReconnectQueriesTest.java       | 428 -------------------
 ...ClientReconnectCacheQueriesFailoverTest.java | 149 +++++++
 .../cache/IgniteClientReconnectQueriesTest.java | 428 +++++++++++++++++++
 ...dCacheAbstractReduceFieldsQuerySelfTest.java |   4 +
 .../IgniteCacheQuerySelfTestSuite.java          |   3 -
 .../IgniteCacheWithIndexingTestSuite.java       |   1 +
 38 files changed, 1147 insertions(+), 692 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
index 726091f..c40dd9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java
@@ -33,6 +33,14 @@ public class IgniteClientDisconnectedException extends IgniteException {
     /**
      * @param reconnectFut Reconnect future.
      * @param msg Error message.
+     */
+    public IgniteClientDisconnectedException(IgniteFuture<?> reconnectFut, String msg) {
+        this(reconnectFut, msg, null);
+    }
+
+    /**
+     * @param reconnectFut Reconnect future.
+     * @param msg Error message.
      * @param cause Optional nested exception (can be {@code null}).
      */
     public IgniteClientDisconnectedException(
@@ -41,8 +49,6 @@ public class IgniteClientDisconnectedException extends IgniteException {
         @Nullable Throwable cause) {
         super(msg, cause);
 
-        assert reconnectFut != null;
-
         this.reconnectFut = reconnectFut;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
index 62adf52..b4e0f01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
@@ -167,7 +167,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
                 }
                 catch (IgniteCheckedException e) {
                     // Avoid stack trace for left nodes.
-                    if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNode(node.id()))
+                    if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNodeNoError(node.id()))
                         U.error(ctx.log(GridJobSiblingImpl.class), "Failed to send cancel request to node " +
                             "[nodeId=" + node.id() + ", ses=" + ses + ']', e);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index b1f4df8..fa395e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -78,7 +78,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
             rwLock.readUnlock();
 
             if (state == GridKernalState.DISCONNECTED)
-                throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null);
+                throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName);
 
             throw illegalState();
         }
@@ -92,7 +92,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
         rwLock.readLock();
 
         if (state == GridKernalState.DISCONNECTED)
-            throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null);
+            throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 0dd3c29..0a9d093 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2186,6 +2186,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
             return false;
         }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
         finally {
             unguard();
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index c4de2da..246eab5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -123,6 +123,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
         try {
             return ctx.discovery().pingNode(nodeId);
         }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
         finally {
             unguard();
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 1cbe68d..9faa056 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -328,7 +328,12 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                     @Override public boolean pingNode(UUID nodeId) {
                         A.notNull(nodeId, "nodeId");
 
-                        return ctx.discovery().pingNode(nodeId);
+                        try {
+                            return ctx.discovery().pingNode(nodeId);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw U.convertException(e);
+                        }
                     }
 
                     @Override public void send(ClusterNode node, Serializable msg, String topic)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index 443b221..3b886a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -293,7 +293,7 @@ class GridDeploymentCommunication {
                     log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']');
             }
             catch (IgniteCheckedException e) {
-                if (ctx.discovery().pingNode(nodeId))
+                if (ctx.discovery().pingNodeNoError(nodeId))
                     U.error(log, "Failed to send peer class loading response to node: " + nodeId, e);
                 else if (log.isDebugEnabled())
                     log.debug("Failed to send peer class loading response to node " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
index 9e418a5..75fb41e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
@@ -94,13 +94,7 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
 
         comm.start();
 
-        locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm);
-        ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm);
-        verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm);
-
-        locStore.start();
-        ldrStore.start();
-        verStore.start();
+        startStores();
 
         if (log.isDebugEnabled()) {
             log.debug("Local deployment: " + locDep);
@@ -129,6 +123,9 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
 
         storesStop();
 
+        if (comm != null)
+            comm.stop();
+
         getSpi().setListener(null);
 
         stopSpi();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 4a064d1..096f0e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -477,6 +477,27 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     return;
                 }
+                else if (type == EVT_CLIENT_NODE_DISCONNECTED) {
+                    /*
+                     * Notify all components from discovery thread to avoid concurrent
+                     * reconnect while disconnect handling is in progress.
+                     */
+
+                    assert locNode.isClient() : locNode;
+                    assert node.isClient() : node;
+
+                    ((IgniteKernal)ctx.grid()).onDisconnected();
+
+                    DiscoveryEvent evt = new DiscoveryEvent();
+
+                    evt.node(ctx.discovery().localNode());
+                    evt.eventNode(node);
+                    evt.type(type);
+
+                    ctx.event().record(evt);
+
+                    return;
+                }
 
                 discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
             }
@@ -1106,8 +1127,36 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      * @param nodeId ID of the node.
      * @return {@code True} if ping succeeded.
+     * @throws IgniteClientDisconnectedCheckedException If ping failed.
      */
-    public boolean pingNode(UUID nodeId) {
+    public boolean pingNode(UUID nodeId) throws IgniteClientDisconnectedCheckedException {
+        assert nodeId != null;
+
+        if (!busyLock.enterBusy())
+            return false;
+
+        try {
+            return getSpi().pingNode(nodeId);
+        }
+        catch (IgniteException e) {
+            if (e.hasCause(IgniteClientDisconnectedCheckedException.class)) {
+                IgniteFuture<?> reconnectFut = ctx.cluster().clientReconnectFuture();
+
+                throw new IgniteClientDisconnectedCheckedException(reconnectFut, e.getMessage());
+            }
+
+            throw e;
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param nodeId ID of the node.
+     * @return {@code True} if ping succeeded.
+     */
+    public boolean pingNodeNoError(UUID nodeId) {
         assert nodeId != null;
 
         if (!busyLock.enterBusy())
@@ -1897,20 +1946,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     break;
                 }
 
-                case EVT_CLIENT_NODE_DISCONNECTED: {
-                    assert localNode().isClient() : evt;
-
-                    ((IgniteKernal)ctx.grid()).onDisconnected();
-
-                    break;
-                }
-
                 case EVT_CLIENT_NODE_RECONNECTED: {
                     assert localNode().isClient() : evt;
 
                     // TODO IGNITE-901.
                     ((IgniteKernal)ctx.grid()).reconnected(false);
 
+                    if (log.isInfoEnabled())
+                        log.info("Client node reconnected to cluster: " + node);
+
+                    ackTopology(topVer.topologyVersion(), true);
+
                     break;
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index a9a73eb..da409a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -68,6 +68,7 @@ public class GridCacheGateway<K, V> {
     /**
      * @param lock {@code True} if lock is held.
      * @param stopErr {@code True} if throw exception if stopped.
+     * @return {@code True} if cache is in started state.
      */
     private boolean checkState(boolean lock, boolean stopErr) {
         State state = this.state;
@@ -86,7 +87,7 @@ public class GridCacheGateway<K, V> {
                 assert reconnectFut != null;
 
                 throw new CacheException(
-                    new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null));
+                    new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + ctx.gridName()));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 6faf6e4..bd2623d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1565,8 +1565,11 @@ public class GridCacheUtils {
             (IgniteClientDisconnectedCheckedException)e
             : e.getCause(IgniteClientDisconnectedCheckedException.class);
 
-        if (disconnectedErr != null)
+        if (disconnectedErr != null) {
+            assert disconnectedErr.reconnectFuture() != null : disconnectedErr;
+
             e = disconnectedErr;
+        }
 
         if (e.hasCause(CacheWriterException.class))
             return new CacheWriterException(U.convertExceptionNoWrap(e));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
index 06c28e6..13af004 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
@@ -37,6 +37,11 @@ public class IgniteCacheFutureImpl<V> extends IgniteFutureImpl<V> {
 
     /** {@inheritDoc} */
     @Override protected RuntimeException convertException(IgniteCheckedException e) {
+        if (e instanceof IgniteFutureCancelledCheckedException ||
+            e instanceof IgniteInterruptedCheckedException ||
+            e instanceof IgniteFutureTimeoutCheckedException)
+            return U.convertException(e);
+
         return CU.convertToCacheException(e);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 969d7a2..f33f791 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -223,7 +223,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
             final CacheConfiguration cfg = cctx.config();
 
-            if (cfg.getRebalanceDelay() >= 0) {
+            if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
                 U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
 
                 demandPool.syncFuture().listen(new CI1<Object>() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 2920176..478426f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -295,7 +295,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
                     ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {
-                    if (ctx.discovery().pingNode(n.id()))
+                    if (ctx.discovery().pingNodeNoError(n.id()))
                         U.error(log, "Failed to send time sync snapshot to remote node (did not leave grid?) " +
                             "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']');
                     else if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 55915f3..605f478 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1275,11 +1275,18 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                         log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
                 }
                 catch (IgniteCheckedException e) {
-                    if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
-                        ((GridFutureAdapter<Object>)fut).onDone(e);
-                    else
-                        ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
-                            "request (node has left): " + node.id()));
+                    GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut);
+
+                    try {
+                        if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+                            fut0.onDone(e);
+                        else
+                            fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): "
+                                + node.id()));
+                    }
+                    catch (IgniteClientDisconnectedCheckedException e0) {
+                        fut0.onDone(e0);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 2d3cf13..cfc051c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -339,7 +339,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
                 GridCacheCountDownLatchValue latchVal = latchView.get(key);
 
                 if (latchVal == null)
-                    throw new IgniteCheckedException("Failed to find count down latch with given name: " + name);
+                    return 0;
 
                 val = latchVal.get();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 48e9686..350068a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1413,7 +1413,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      * @return {@code true} if node is dead, {@code false} is node is alive.
      */
     private boolean isDeadNode(UUID uid) {
-        return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
+        return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index d1ee5ad..3a309f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -863,7 +863,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
      * @return {@code true} if node is dead, {@code false} is node is alive.
      */
     private boolean isDeadNode(UUID uid) {
-        return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
+        return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 65ce557..d3caf5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -121,11 +121,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
         IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut);
 
-        for (GridTaskWorker<?, ?> worker : tasks.values()) {
+        for (GridTaskWorker<?, ?> worker : tasks.values())
             worker.finishTask(null, err);
-
-            worker.cancel();
-        }
     }
 
     /**
@@ -617,31 +614,29 @@ public class GridTaskProcessor extends GridProcessorAdapter {
 
                 assert taskWorker0 == null : "Session ID is not unique: " + sesId;
 
-                if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
-                    try {
-                        // Start task execution in another thread.
-                        if (sys)
-                            ctx.getSystemExecutorService().execute(taskWorker);
-                        else
-                            ctx.getExecutorService().execute(taskWorker);
-                    }
-                    catch (RejectedExecutionException e) {
-                        tasks.remove(sesId);
+                if (!ctx.clientDisconnected()) {
+                    if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
+                        try {
+                            // Start task execution in another thread.
+                            if (sys)
+                                ctx.getSystemExecutorService().execute(taskWorker);
+                            else
+                                ctx.getExecutorService().execute(taskWorker);
+                        }
+                        catch (RejectedExecutionException e) {
+                            tasks.remove(sesId);
 
-                        release(dep);
+                            release(dep);
 
-                        handleException(new ComputeExecutionRejectedException("Failed to execute task " +
-                            "due to thread pool execution rejection: " + taskName, e), fut);
+                            handleException(new ComputeExecutionRejectedException("Failed to execute task " +
+                                "due to thread pool execution rejection: " + taskName, e), fut);
+                        }
                     }
+                    else
+                        taskWorker.run();
                 }
                 else
-                    taskWorker.run();
-
-                if (ctx.clientDisconnected()) {
                     taskWorker.finishTask(null, disconnectedError(null));
-
-                    taskWorker.cancel();
-                }
             }
         }
         else {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index eb5fa77..133a31f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1070,10 +1070,17 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                             PUBLIC_POOL);
                 }
                 catch (IgniteCheckedException e) {
-                    if (!isDeadNode(nodeId))
-                        U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" +
-                            nodeId + ", taskName=" + ses.getTaskName() +
-                            ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e);
+                    try {
+                        if (!isDeadNode(nodeId))
+                            U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" +
+                                nodeId + ", taskName=" + ses.getTaskName() +
+                                ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e);
+                    }
+                    catch (IgniteClientDisconnectedCheckedException e0) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send cancel request to node, client disconnected [nodeId=" +
+                                nodeId + ", taskName=" + ses.getTaskName() + ']');
+                    }
                 }
             }
         }
@@ -1169,24 +1176,39 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
             }
         }
         catch (IgniteCheckedException e) {
-            boolean deadNode = isDeadNode(res.getNode().id());
+            IgniteException fakeErr = null;
 
-            // Avoid stack trace if node has left grid.
-            if (deadNode)
-                U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +
-                    "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
-                    ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
-            else
-                U.error(log, "Failed to send job request: " + req, e);
+            try {
+                boolean deadNode = isDeadNode(res.getNode().id());
+
+                // Avoid stack trace if node has left grid.
+                if (deadNode) {
+                    U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +
+                        "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
+                        ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
+
+                    fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e);
+                }
+                else
+                    U.error(log, "Failed to send job request: " + req, e);
+
+            }
+            catch (IgniteClientDisconnectedCheckedException e0) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send job request, client disconnected [node=" + node +
+                        ", taskName=" + ses.getTaskName() + ", taskSesId=" + ses.getId() + ", jobSesId=" +
+                        res.getJobContext().getJobId() + ']');
+
+                fakeErr = U.convertException(e0);
+            }
 
             GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),
                 res.getJobContext().getJobId(), null, null, null, null, null, null, false);
 
-            if (deadNode)
-                fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " +
-                    node, e));
-            else
-                fakeRes.setFakeException(U.convertException(e));
+            if (fakeErr == null)
+                fakeErr = U.convertException(e);
+
+            fakeRes.setFakeException(fakeErr);
 
             onResponse(fakeRes);
         }
@@ -1345,8 +1367,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
      *
      * @param uid UID of node to check.
      * @return {@code true} if node is dead, {@code false} is node is alive.
+     * @throws IgniteClientDisconnectedCheckedException if ping failed when client disconnected.
      */
-    private boolean isDeadNode(UUID uid) {
+    private boolean isDeadNode(UUID uid) throws IgniteClientDisconnectedCheckedException {
         return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 91d8172..149222e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -682,6 +682,25 @@ public abstract class IgniteUtils {
      * @return Ignite runtime exception.
      */
     public static IgniteException convertException(IgniteCheckedException e) {
+        IgniteClientDisconnectedException e0 = e.getCause(IgniteClientDisconnectedException.class);
+
+        if (e0 != null) {
+            assert e0.reconnectFuture() != null : e0;
+
+            throw e0;
+        }
+
+        IgniteClientDisconnectedCheckedException disconnectedErr =
+            e instanceof IgniteClientDisconnectedCheckedException ?
+            (IgniteClientDisconnectedCheckedException)e
+            : e.getCause(IgniteClientDisconnectedCheckedException.class);
+
+        if (disconnectedErr != null) {
+            assert disconnectedErr.reconnectFuture() != null : disconnectedErr;
+
+            e = disconnectedErr;
+        }
+
         C1<IgniteCheckedException, IgniteException> converter = exceptionConverters.get(e.getClass());
 
         if (converter != null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7691e3f..8ea2b82 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2707,10 +2707,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
             ClusterNode node = recoveryDesc.node();
 
-            if (clients.containsKey(node.id()) ||
-                !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
-                !getSpiContext().pingNode(node.id()))
+            try {
+                if (clients.containsKey(node.id()) ||
+                    !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) ||
+                    !getSpiContext().pingNode(node.id()))
+                    return;
+            }
+            catch (IgniteClientDisconnectedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to ping node, client disconnected.");
+
                 return;
+            }
 
             try {
                 if (log.isDebugEnabled())
@@ -3100,6 +3108,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
          * @param nodeId Node ID.
          */
         private NodeIdMessage(UUID nodeId) {
+            assert nodeId != null;
+
             nodeIdBytes = U.uuidToBytes(nodeId);
 
             nodeIdBytesWithType = new byte[nodeIdBytes.length + 1];

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 38ba8fd..b3793b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -70,6 +70,9 @@ class ClientImpl extends TcpDiscoveryImpl {
     /** */
     private SocketReader sockReader;
 
+    /** */
+    private volatile State state;
+
     /** Last message ID. */
     private volatile IgniteUuid lastMsgId;
 
@@ -255,23 +258,36 @@ class ClientImpl extends TcpDiscoveryImpl {
             if (oldFut != null)
                 fut = oldFut;
             else {
-                if (spi.getSpiContext().isStopping()) {
+                State state = this.state;
+
+                if (spi.getSpiContext().isStopping() || state == State.STOPPED || state == State.SEGMENTED) {
                     if (pingFuts.remove(nodeId, fut))
                         fut.onDone(false);
 
                     return false;
                 }
+                else if (state == State.DISCONNECTED) {
+                    if (pingFuts.remove(nodeId, fut))
+                        fut.onDone(new IgniteClientDisconnectedCheckedException(null,
+                            "Failed to ping node, client node disconnected."));
+                }
+                else {
+                    final GridFutureAdapter<Boolean> finalFut = fut;
+
+                    timer.schedule(new TimerTask() {
+                        @Override public void run() {
+                            if (pingFuts.remove(nodeId, finalFut)) {
+                                if (ClientImpl.this.state == State.DISCONNECTED)
+                                    finalFut.onDone(new IgniteClientDisconnectedCheckedException(null,
+                                        "Failed to ping node, client node disconnected."));
+                                else
+                                    finalFut.onDone(false);
+                            }
+                        }
+                    }, spi.netTimeout);
 
-                final GridFutureAdapter<Boolean> finalFut = fut;
-
-                timer.schedule(new TimerTask() {
-                    @Override public void run() {
-                        if (pingFuts.remove(nodeId, finalFut))
-                            finalFut.onDone(false);
-                    }
-                }, spi.netTimeout);
-
-                sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+                    sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId));
+                }
             }
         }
 
@@ -282,7 +298,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             return false;
         }
         catch (IgniteCheckedException e) {
-            throw new IgniteSpiException(e); // Should newer occur.
+            throw new IgniteSpiException(e);
         }
     }
 
@@ -953,8 +969,7 @@ class ClientImpl extends TcpDiscoveryImpl {
         @Override protected void body() throws InterruptedException {
             assert state == ClientImpl.State.DISCONNECTED
                 || state == ClientImpl.State.CONNECTED
-                || state == ClientImpl.State.STARTING :
-                state;
+                || state == ClientImpl.State.STARTING : state;
 
             boolean success = false;
 
@@ -976,7 +991,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                         else
                             U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " +
-                                "configuration  property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
+                                "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']');
 
                         return;
                     }
@@ -1171,21 +1186,36 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         reconnector = null;
 
-                        state = ClientImpl.State.DISCONNECTED;
+                        if (spi.isClientReconnectDisabled()) {
+                            state = ClientImpl.State.SEGMENTED;
+
+                            notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
+                        }
+                        else {
+                            state = ClientImpl.State.DISCONNECTED;
+
+                            nodeAdded = false;
 
-                        nodeAdded = false;
+                            IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
+                                null, "Failed to ping node, client node disconnected.");
 
-                        notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+                            for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+                                GridFutureAdapter<Boolean> fut = e.getValue();
+
+                                if (pingFuts.remove(e.getKey(), fut))
+                                    fut.onDone(err);
+                            }
 
-                        UUID newId = UUID.randomUUID();
+                            notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
 
-                        log.info("Change node id: " + newId);
+                            UUID newId = UUID.randomUUID();
 
-                        rmtNodes.clear();
+                            log.info("Change node id: " + newId + " " + locNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME));
 
-                        locNode.onClientDisconnected(newId);
+                            locNode.onClientDisconnected(newId);
 
-                        tryJoin();
+                            tryJoin();
+                        }
                     }
                     else {
                         TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg;
@@ -1298,11 +1328,13 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @return {@code True} if client in process of join.
          */
         private boolean joining() {
+            ClientImpl.State state = ClientImpl.this.state;
+
             return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED;
         }
 
         /**
-         * @return {@code True} if disconnected.
+         * @return {@code True} if client disconnected.
          */
         private boolean disconnected() {
             return state == ClientImpl.State.DISCONNECTED;
@@ -1795,17 +1827,23 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
     }
 
-    private volatile State state;
-
+    /**
+     *
+     */
     private enum State {
+        /** */
         STARTING,
 
+        /** */
         CONNECTED,
 
+        /** */
         DISCONNECTED,
 
+        /** */
         SEGMENTED,
 
+        /** */
         STOPPED
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 9446d2d..3995207 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -327,6 +327,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** */
     private boolean forceSrvMode;
 
+    /** */
+    private boolean clientReconnectDisabled;
+
     /** {@inheritDoc} */
     @Override public String getSpiState() {
         return impl.getSpiState();
@@ -416,6 +419,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         return this;
     }
 
+    public boolean isClientReconnectDisabled() {
+        return clientReconnectDisabled;
+    }
+
+    @IgniteSpiConfiguration(optional = true)
+    public void setClientReconnectDisabled(boolean clientReconnectDisabled) {
+        this.clientReconnectDisabled = clientReconnectDisabled;
+    }
+
     /**
      * Inject resources
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index ec043f8..8fca97c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -218,6 +218,9 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
     protected IgniteFuture<?> check(CacheException e) {
         log.info("Expected exception: " + e);
 
+        if (!(e.getCause() instanceof IgniteClientDisconnectedException))
+            log.error("Unexpected cause: " + e.getCause(), e);
+
         assertTrue("Unexpected cause: " + e.getCause(), e.getCause() instanceof IgniteClientDisconnectedException);
 
         IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
new file mode 100644
index 0000000..551cb1a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    private static final Integer THREADS = 1;
+
+    /** */
+    private volatile CyclicBarrier barrier;
+
+    /** */
+    protected static final long TEST_TIME = 90_000;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIME * 60_000;
+    }
+
+    /**
+     * @param c Test closure.
+     * @throws Exception If failed.
+     */
+    protected final void reconnectFailover(final Callable<Void> c) throws Exception {
+        final Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final AtomicBoolean stop = new AtomicBoolean(false);
+
+        final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    int iter = 0;
+
+                    while (!stop.get()) {
+                        try {
+                            c.call();
+                        }
+                        catch (CacheException e) {
+                            checkAndWait(e);
+                        }
+                        catch (IgniteClientDisconnectedException e) {
+                            checkAndWait(e);
+                        }
+
+                        if (++iter % 100 == 0)
+                            log.info("Iteration: " + iter);
+
+                        if (barrier != null)
+                            barrier.await();
+                    }
+
+                    return null;
+                } catch (Throwable e) {
+                    log.error("Unexpected error in operation thread: " + e, e);
+
+                    stop.set(true);
+
+                    throw e;
+                }
+            }
+        }, THREADS, "test-operation-thread");
+
+        final AtomicReference<CountDownLatch> disconnected = new AtomicReference<>();
+        final AtomicReference<CountDownLatch> reconnected = new AtomicReference<>();
+
+        IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    CountDownLatch latch = reconnected.get();
+
+                    assertNotNull(latch);
+                    assertEquals(1, latch.getCount());
+
+                    latch.countDown();
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    CountDownLatch latch = disconnected.get();
+
+                    assertNotNull(latch);
+                    assertEquals(1, latch.getCount());
+
+                    latch.countDown();
+                }
+
+                return true;
+            }
+        };
+
+        client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        try {
+            long stopTime = System.currentTimeMillis() + TEST_TIME;
+
+            String err = null;
+
+            while (System.currentTimeMillis() < stopTime && !fut.isDone()) {
+                U.sleep(100);
+
+                CountDownLatch disconnectLatch = new CountDownLatch(1);
+                CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+                disconnected.set(disconnectLatch);
+                reconnected.set(reconnectLatch);
+
+                UUID nodeId = client.cluster().localNode().id();
+
+                log.info("Fail client: " + nodeId);
+
+                srvSpi.failNode(nodeId, null);
+
+                if (!disconnectLatch.await(5000, MILLISECONDS)) {
+                    err = "Failed to wait for disconnect";
+
+                    break;
+                }
+
+                if (!reconnectLatch.await(5000, MILLISECONDS)) {
+                    err = "Failed to wait for reconnect";
+
+                    break;
+                }
+
+                barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
+                    @Override public void run() {
+                        barrier = null;
+                    }
+                });
+
+                try {
+                    barrier.await(10, SECONDS);
+                }
+                catch (TimeoutException e) {
+                    err = "Operations hang or fail with unexpected error.";
+
+                    break;
+                }
+            }
+
+            if (err != null) {
+                log.error(err);
+
+                U.dumpThreads(log);
+
+                CyclicBarrier barrier0 = barrier;
+
+                if (barrier0 != null)
+                    barrier0.reset();
+
+                stop.set(true);
+
+                fut.get();
+
+                fail(err);
+            }
+
+            stop.set(true);
+
+            fut.get();
+        }
+        finally {
+            client.events().stopLocalListen(p);
+
+            stop.set(true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
index 35f86f5..7cfc329 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
@@ -19,37 +19,24 @@ package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.testframework.*;
 import org.apache.ignite.transactions.*;
 
-import javax.cache.*;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
 
-import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.transactions.TransactionIsolation.*;
 
 /**
  *
  */
-public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbstractTest {
+public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectFailoverAbstractTest {
     /** */
-    public final Integer THREADS = 8;
+    protected static final String ATOMIC_CACHE = "ATOMIC_CACHE";
 
     /** */
-    private volatile CyclicBarrier barrier;
-
-    /** */
-    private static final String ATOMIC_CACHE = "ATOMIC_CACHE";
-
-    /** */
-    private static final String TX_CACHE = "TX_CACHE";
+    protected static final String TX_CACHE = "TX_CACHE";
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -72,21 +59,6 @@ public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbst
         return cfg;
     }
 
-    /** {@inheritDoc} */
-    @Override protected int serverCount() {
-        return 3;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int clientCount() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 2 * 60_000;
-    }
-
     /**
      * @throws Exception If failed.
      */
@@ -199,138 +171,33 @@ public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbst
     }
 
     /**
-     * @param c Test closure.
      * @throws Exception If failed.
      */
-    public void reconnectFailover(final Callable<Void> c) throws Exception {
+    public void testReconnectStreamerApi() throws Exception {
         final Ignite client = grid(serverCount());
 
-        assertTrue(client.cluster().localNode().isClient());
-
-        Ignite srv = clientRouter(client);
-
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
-        final AtomicBoolean stop = new AtomicBoolean(false);
-
-        final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                try {
-                    int iter = 0;
-
-                    while (!stop.get()) {
-                        try {
-                            c.call();
-                        }
-                        catch (CacheException e) {
-                            checkAndWait(e);
-                        }
-                        catch (IgniteClientDisconnectedException e) {
-                            checkAndWait(e);
-                        }
-
-                        if (++iter % 100 == 0)
-                            log.info("Iteration: " + iter);
-
-                        if (barrier != null)
-                            barrier.await();
-                    }
-
-                    return null;
-                }
-                catch (Throwable e) {
-                    stop.set(true);
-
-                    log.error("Unexpected error: " + e, e);
-
-                    throw e;
-                }
-            }
-        }, THREADS, "test-operation-thread");
-
-        final AtomicReference<CountDownLatch> reconnected = new AtomicReference<>();
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    CountDownLatch latch = reconnected.get();
-
-                    assertNotNull(latch);
-                    assertEquals(1, latch.getCount());
+        reconnectFailover(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                stream(ATOMIC_CACHE);
 
-                    latch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
-                    info("Disconnected: " + evt);
+                stream(TX_CACHE);
 
-                return true;
+                return null;
             }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        try {
-            long stopTime = System.currentTimeMillis() + 60_000;
-
-            String err = null;
 
-            while (System.currentTimeMillis() < stopTime && !fut.isDone()) {
-                U.sleep(100);
-
-                CountDownLatch latch = new CountDownLatch(1);
-
-                reconnected.set(latch);
-
-                UUID nodeId = client.cluster().localNode().id();
-
-                log.info("Fail client: " + nodeId);
-
-                srvSpi.failNode(nodeId, null);
-
-                if (!latch.await(5000, MILLISECONDS)) {
-                    err = "Failed to wait for reconnect";
-
-                    break;
-                }
+            private void stream(String cacheName) {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
-                    @Override public void run() {
-                        barrier = null;
-                    }
-                });
+                try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(cacheName)) {
+                    streamer.allowOverwrite(true);
 
-                try {
-                    barrier.await(10, SECONDS);
-                }
-                catch (TimeoutException e) {
-                    err = "Operation hangs.";
+                    streamer.perNodeBufferSize(10);
 
-                    break;
+                    for (int i = 0; i < 100; i++)
+                        streamer.addData(rnd.nextInt(100_000), 0);
                 }
             }
-
-            if (err != null) {
-                log.error(err);
-
-                U.dumpThreads(log);
-
-                CyclicBarrier barrier0 = barrier;
-
-                if (barrier0 != null)
-                    barrier0.reset();
-
-                stop.set(true);
-
-                fail(err);
-            }
-
-            stop.set(true);
-
-            fut.get();
-        }
-        finally {
-            stop.set(true);
-        }
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 27c2a61..a392245 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -62,6 +62,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setClientReconnectDisabled(true);
 
         if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName))
             cfg.setClientMode(true);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
index 071341e..8703d32 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -27,7 +26,7 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 
-import javax.cache.Cache;
+import javax.cache.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -89,7 +88,8 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest {
                             @Override public void apply(IgniteFuture<?> f) {
                                 try {
                                     f.get();
-                                } catch (IgniteException ignore) {
+                                }
+                                catch (CacheException ignore) {
                                     // This may be debugged.
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
index af3ea9d..30bf5dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.transactions.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.indexing.*;
 import org.apache.ignite.testframework.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 55fae9b..ba38dfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -1332,6 +1332,27 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectDisabled() throws Exception {
+        // TODO IGNTIE-901.
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDisconnectAfterNetworkTimeout() throws Exception {
+        // TODO IGNTIE-901.
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectSegmentedAfterJoinTimeout() throws Exception {
+        // TODO IGNTIE-901.
+    }
+
+    /**
      * @param clientIdx Client index.
      * @param srvIdx Server index.
      * @throws Exception In case of error.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 64e16bf..b531c35 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -492,7 +492,7 @@ public class GridReduceQueryExecutor {
                 if (ctx.clientDisconnected()) {
                     throw new CacheException("Query was cancelled, client node disconnected.",
                         new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(),
-                        "Client node disconnected.", null));
+                        "Client node disconnected."));
                 }
 
                 Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
@@ -573,7 +573,17 @@ public class GridReduceQueryExecutor {
                 if (e instanceof CacheException)
                     throw (CacheException)e;
 
-                throw new CacheException("Failed to run reduce query locally.", e);
+                Throwable cause = e;
+
+                if (e instanceof IgniteCheckedException) {
+                    Throwable disconnectedErr =
+                        ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class);
+
+                    if (disconnectedErr != null)
+                        cause = disconnectedErr;
+                }
+
+                throw new CacheException("Failed to run reduce query locally.", cause);
             }
             finally {
                 if (!runs.remove(qryReqId, r))
@@ -1109,7 +1119,7 @@ public class GridReduceQueryExecutor {
      */
     public void onDisconnected(IgniteFuture<?> reconnectFut) {
         CacheException err = new CacheException("Query was cancelled, client node disconnected.",
-            new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.", null));
+            new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."));
 
         for (Map.Entry<Long, QueryRun> e : runs.entrySet())
             e.getValue().state(err, null);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/86d963f9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
deleted file mode 100644
index b0dc965..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/IgniteClientReconnectQueriesTest.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.cache.query.annotations.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.testframework.*;
-
-import javax.cache.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- *
- */
-public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstractTest {
-    /** */
-    public static final String QUERY_CACHE = "query";
-
-    /** {@inheritDoc} */
-    @Override protected int serverCount() {
-        return 3;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int clientCount() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer, Person>(QUERY_CACHE)
-            .setCacheMode(PARTITIONED)
-            .setAtomicityMode(ATOMIC)
-            .setBackups(1)
-            .setIndexedTypes(Integer.class, Person.class);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
-        grid(0).getOrCreateCache(QUERY_CACHE).removeAll();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testQueryReconnect() throws Exception {
-        Ignite cln = grid(serverCount());
-
-        assertTrue(cln.cluster().localNode().isClient());
-
-        final Ignite srv = clientRouter(cln);
-
-        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
-
-        final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
-
-        clnCache.put(1, new Person(1, "name1", "surname1"));
-        clnCache.put(2, new Person(2, "name2", "surname2"));
-        clnCache.put(3, new Person(3, "name3", "surname3"));
-
-        final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0");
-
-        qry.setPageSize(1);
-
-        QueryCursor<Cache.Entry<Integer, Person>> cur = clnCache.query(qry);
-
-        reconnectClientNode(cln, srv, new Runnable() {
-            @Override public void run() {
-                srvCache.put(4, new Person(4, "name4", "surname4"));
-
-                try {
-                    clnCache.query(qry);
-
-                    fail();
-                } catch (CacheException e) {
-                    check(e);
-                }
-            }
-        });
-
-        List<Cache.Entry<Integer, Person>> res = cur.getAll();
-
-        assertNotNull(res);
-        assertEquals(4, res.size());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReconnectQueryInProgress() throws Exception {
-        Ignite cln = grid(serverCount());
-
-        assertTrue(cln.cluster().localNode().isClient());
-
-        final Ignite srv = clientRouter(cln);
-
-        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
-
-        clnCache.put(1, new Person(1, "name1", "surname1"));
-        clnCache.put(2, new Person(2, "name2", "surname2"));
-        clnCache.put(3, new Person(3, "name3", "surname3"));
-
-        blockMessage(GridQueryNextPageResponse.class);
-
-        final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0");
-
-        qry.setPageSize(1);
-
-        final QueryCursor<Cache.Entry<Integer, Person>> cur1 = clnCache.query(qry);
-
-        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                try {
-                    cur1.getAll();
-                }
-                catch (CacheException e) {
-                    checkAndWait(e);
-
-                    return true;
-                }
-
-                return false;
-            }
-        });
-
-        // Check that client waiting operation.
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                return fut.get(200);
-            }
-        }, IgniteFutureTimeoutCheckedException.class, null);
-
-        assertNotDone(fut);
-
-        unblockMessage();
-
-        reconnectClientNode(cln, srv, null);
-
-        assertTrue((Boolean) fut.get(2, SECONDS));
-
-        QueryCursor<Cache.Entry<Integer, Person>> cur2 = clnCache.query(qry);
-
-        assertEquals(3, cur2.getAll().size());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQueryReconnect() throws Exception {
-        Ignite cln = grid(serverCount());
-
-        assertTrue(cln.cluster().localNode().isClient());
-
-        final Ignite srv = clientRouter(cln);
-
-        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
-
-        final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
-
-        for (int i = 0; i < 100; i++)
-            clnCache.put(i, new Person(i, "name-" + i, "surname-" + i));
-
-        final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
-
-        scanQry.setPageSize(1);
-
-        scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
-            @Override public boolean apply(Integer integer, Person person) {
-                return true;
-            }
-        });
-
-        QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
-
-        reconnectClientNode(cln, srv, new Runnable() {
-            @Override public void run() {
-                srvCache.put(1000, new Person(1000, "name", "surname"));
-
-                try {
-                    clnCache.query(scanQry);
-
-                    fail();
-                }
-                catch (CacheException e) {
-                    check(e);
-                }
-            }
-        });
-
-        try {
-            qryCursor.getAll();
-
-            fail();
-        }
-        catch (CacheException e) {
-            checkAndWait(e);
-        }
-
-        qryCursor = clnCache.query(scanQry);
-
-        assertEquals(101, qryCursor.getAll().size());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQueryReconnectInProgress1() throws Exception {
-        scanQueryReconnectInProgress(false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testScanQueryReconnectInProgress2() throws Exception {
-        scanQueryReconnectInProgress(true);
-    }
-
-    /**
-     * @param setPart If {@code true} sets partition for scan query.
-     * @throws Exception If failed.
-     */
-    private void scanQueryReconnectInProgress(boolean setPart) throws Exception {
-        Ignite cln = grid(serverCount());
-
-        assertTrue(cln.cluster().localNode().isClient());
-
-        final Ignite srv = clientRouter(cln);
-
-        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
-
-        clnCache.put(1, new Person(1, "name1", "surname1"));
-        clnCache.put(2, new Person(2, "name2", "surname2"));
-        clnCache.put(3, new Person(3, "name3", "surname3"));
-
-        final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
-
-        scanQry.setPageSize(1);
-
-        scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
-            @Override public boolean apply(Integer integer, Person person) {
-                return true;
-            }
-        });
-
-        if (setPart)
-            scanQry.setPartition(1);
-
-        blockMessage(GridCacheQueryResponse.class);
-
-        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                try {
-                    QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
-
-                    qryCursor.getAll();
-                }
-                catch (CacheException e) {
-                    checkAndWait(e);
-
-                    return true;
-                }
-
-                return false;
-            }
-        });
-
-        // Check that client waiting operation.
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                return fut.get(200);
-            }
-        }, IgniteFutureTimeoutCheckedException.class, null);
-
-        assertNotDone(fut);
-
-        unblockMessage();
-
-        reconnectClientNode(cln, srv, null);
-
-        assertTrue((Boolean)fut.get(2, SECONDS));
-
-        QueryCursor<Cache.Entry<Integer, Person>> qryCursor2 = clnCache.query(scanQry);
-
-        assertEquals(setPart ? 1 : 3, qryCursor2.getAll().size());
-    }
-
-    /**
-     * @param clazz Message class.
-     */
-    private void blockMessage(Class<?> clazz) {
-        for (int i = 0; i < serverCount(); i++) {
-            BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
-
-            commSpi.blockMessage(clazz);
-        }
-    }
-
-    /**
-     *
-     */
-    private void unblockMessage() {
-        for (int i = 0; i < serverCount(); i++) {
-            BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
-
-            commSpi.unblockMessage();
-        }
-    }
-
-    /**
-     *
-     */
-    public static class Person {
-        /** */
-        @QuerySqlField
-        public int id;
-
-        /** */
-        @QuerySqlField
-        public String name;
-
-        /** */
-        @QuerySqlField
-        public String surname;
-
-        /**
-         * @param id Id.
-         * @param name Name.
-         * @param surname Surname.
-         */
-        public Person(int id, String name, String surname) {
-            this.id = id;
-            this.name = name;
-            this.surname = surname;
-        }
-
-        /**
-         * @return Id.
-         */
-        public int getId() {
-            return id;
-        }
-
-        /**
-         * @param id Set id.
-         */
-        public void setId(int id) {
-            this.id = id;
-        }
-
-        /**
-         * @return Name.
-         */
-        public String getName() {
-            return name;
-        }
-
-        /**
-         * @param name Name.
-         */
-        public void setName(String name) {
-            this.name = name;
-        }
-
-        /**
-         * @return Surname.
-         */
-        public String getSurname() {
-            return surname;
-        }
-
-        /**
-         * @param surname Surname.
-         */
-        public void setSurname(String surname) {
-            this.surname = surname;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Person.class, this);
-        }
-    }
-}