You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/17 15:50:19 UTC

[ignite] branch master updated: IGNITE-9171: SQL: redesigned lazy mode. This closes #5473.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e6a5690  IGNITE-9171: SQL: redesigned lazy mode. This closes #5473.
e6a5690 is described below

commit e6a56902c38ca001cba954af37787fa31cc750d1
Author: tledkov-gridgain <tl...@gridgain.com>
AuthorDate: Sun Feb 17 18:50:10 2019 +0300

    IGNITE-9171: SQL: redesigned lazy mode. This closes #5473.
---
 .../jdbc/thin/JdbcThinStatementCancelSelfTest.java |   4 +-
 .../org/apache/ignite/IgniteSystemProperties.java  |   7 +-
 .../ignite/cache/query/QueryRetryException.java}   |  22 +-
 .../apache/ignite/cache/query/SqlFieldsQuery.java  |   8 +-
 .../processors/cache/GridCacheEventManager.java    |   2 +-
 .../processors/odbc/ClientListenerProcessor.java   |   4 +-
 .../h2/twostep/messages/GridQueryFailResponse.java |  12 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |   5 +-
 .../processors/query/h2/ConnectionManager.java     |   5 +-
 .../processors/query/h2/H2ConnectionWrapper.java   |   5 +-
 .../processors/query/h2/H2FieldsIterator.java      |   5 +-
 .../internal/processors/query/h2/H2Utils.java      |  16 +
 .../processors/query/h2/IgniteH2Indexing.java      |  60 +-
 .../processors/query/h2/opt/GridH2Table.java       | 198 ++++-
 .../processors/query/h2/opt/QueryContext.java      |  21 -
 .../query/h2/opt/QueryContextRegistry.java         |   7 +-
 .../query/h2/twostep/GridMapQueryExecutor.java     | 538 ++++++-------
 .../query/h2/twostep/GridReduceQueryExecutor.java  |  61 +-
 .../query/h2/twostep/MapNodeResults.java           |  12 +-
 .../query/h2/twostep/MapQueryLazyWorker.java       | 200 -----
 .../query/h2/twostep/MapQueryLazyWorkerKey.java    |  97 ---
 .../query/h2/twostep/MapQueryResult.java           | 200 +++--
 .../query/h2/twostep/MapQueryResults.java          | 137 ++--
 .../query/h2/twostep/PartitionReservation.java     |  17 +-
 .../h2/twostep/PartitionReservationManager.java    |  18 +-
 .../query/h2/twostep/ReduceResultPage.java         |  22 +-
 .../apache/ignite/client/FunctionalQueryTest.java  |  71 +-
 .../GridCacheLazyQueryPartitionsReleaseTest.java   |   2 -
 ...tributedQueryStopOnCancelOrTimeoutSelfTest.java |  46 +-
 ...eCacheQueryAbstractDistributedJoinSelfTest.java |   5 +
 ...cheQueryNodeRestartDistributedJoinSelfTest.java |  16 +-
 ...opOnCancelOrTimeoutDistributedJoinSelfTest.java |  26 +-
 .../DynamicColumnsAbstractConcurrentSelfTest.java  |  28 +-
 .../cache/index/DynamicIndexAbstractSelfTest.java  |  37 +-
 .../cache/index/H2ConnectionLeaksSelfTest.java     |   2 +-
 ...niteCacheLocalQueryCancelOrTimeoutSelfTest.java |   3 +-
 ...actQueryTableLockAndConnectionPoolSelfTest.java | 854 +++++++++++++++++++++
 ...TableLockAndConnectionPoolLazyModeOffTest.java} |  16 +-
 ...yTableLockAndConnectionPoolLazyModeOnTest.java} |  16 +-
 .../processors/query/LazyQuerySelfTest.java        | 392 ----------
 .../processors/query/h2/QueryDataPageScanTest.java |   2 +-
 .../DisappearedCacheCauseRetryMessageSelfTest.java |  15 +-
 .../h2/twostep/RetryCauseMessageSelfTest.java      |  26 -
 .../processors/query/oom/AbstractQueryOOMTest.java | 429 +++++++++++
 .../query/oom/IgniteQueryOOMTestSuite.java}        |  13 +-
 .../oom/QueryOOMWithQueryParallelismTest.java}     |  15 +-
 .../oom/QueryOOMWithoutQueryParallelismTest.java}  |  15 +-
 .../IgniteBinaryCacheQueryTestSuite.java           |   6 +-
 .../IgniteBinaryCacheQueryLazyTestSuite.java}      |  21 +-
 .../IgniteBinaryCacheQueryLazyTestSuite2.java}     |  21 +-
 .../Cache/Query/CacheQueriesTest.cs                |   6 +-
 .../Client/Cache/SqlQueryTest.cs                   |   4 +-
 .../nodejs/spec/query/SqlFieldsQuery.spec.js       |  23 +-
 modules/platforms/php/tests/SqlFieldsQueryTest.php |  32 +-
 .../benchmark-native-sql-select-join.properties    |   6 +-
 .../config/benchmark-native-sql-select.properties  |   6 +-
 .../apache/ignite/yardstick/jdbc/JdbcUtils.java    |  62 +-
 .../jdbc/NativeSqlJoinQueryRangeBenchmark.java     |  80 +-
 58 files changed, 2487 insertions(+), 1492 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java
index 6bc1bb0..4d54456 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementCancelSelfTest.java
@@ -439,9 +439,9 @@ public class JdbcThinStatementCancelSelfTest extends JdbcThinAbstractSelfTest {
             IgniteInternalFuture cancelRes = cancel(stmt);
 
             GridTestUtils.assertThrows(log, () -> {
-                stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
+                stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30) OR _key < " + MAX_ROWS);
                 stmt.addBatch("update Long set _val = _val + 1 where awaitLatchCancelled() = 0");
-                stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
+                stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30) OR _key < " + MAX_ROWS);
                 stmt.addBatch("update Long set _val = _val + 1 where shouldNotBeCalledInCaseOfCancellation()");
 
                 stmt.executeBatch();
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 8438968..64982d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -492,7 +492,12 @@ public final class IgniteSystemProperties {
     /** Disable fallback to H2 SQL parser if the internal SQL parser fails to parse the statement. */
     public static final String IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK = "IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK";
 
-    /** Force all SQL queries to be processed lazily regardless of what clients request. */
+    /**
+     *  Force all SQL queries to be processed lazily regardless of what clients request.
+     *
+     * @deprecated Since version 2.8.
+     */
+    @Deprecated
     public static final String IGNITE_SQL_FORCE_LAZY_RESULT_SET = "IGNITE_SQL_FORCE_LAZY_RESULT_SET";
 
     /** Disable SQL system views. */
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryRetryException.java
similarity index 62%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/core/src/main/java/org/apache/ignite/cache/query/QueryRetryException.java
index 0a6181e..10afa00 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryRetryException.java
@@ -15,15 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.testsuites;
+package org.apache.ignite.cache.query;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+import org.apache.ignite.IgniteException;
 
 /**
- * Special test suite with ignored tests for Binary mode.
+ * The exception is thrown if a query was cancelled or timed out while executing.
  */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
-}
+public class QueryRetryException extends IgniteException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param tableName Table name.
+     */
+    public QueryRetryException(String tableName) {
+        super("Table was modified concurrently (please retry the query): " + tableName);
+    }
+}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index fe283e9..aa66716 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -51,6 +51,10 @@ public class SqlFieldsQuery extends Query<List<?>> {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Do not remove. For tests only. */
+    @SuppressWarnings("NonConstantFieldWithUpperCaseName")
+    private static boolean DFLT_LAZY;
+
     /** SQL Query. */
     private String sql;
 
@@ -73,8 +77,8 @@ public class SqlFieldsQuery extends Query<List<?>> {
     /** */
     private boolean replicatedOnly;
 
-    /** */
-    private boolean lazy;
+    /** Lazy mode is default since Ignite v.2.8. */
+    private boolean lazy = DFLT_LAZY;
 
     /** Partitions for query */
     private int[] parts;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 726a6c8..501da08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -392,7 +392,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
             return false;
 
         return cctx0 != null && cctx0.userCache() && cctx0.gridEvents().isRecordable(type)
-            && !cctx0.config().isEventsDisabled();
+            && cctx0.config() != null && !cctx0.config().isEventsDisabled();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
index 5870f40..c988f33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
@@ -284,9 +284,9 @@ public class ClientListenerProcessor extends GridProcessorAdapter {
                     }
 
                     if (connCtx.handler().isCancellationCommand(cmdType)) {
-                        proceedMessageReceived(ses, msg);
-
                         CANCEL_COUNTER.incrementAndGet();
+
+                        proceedMessageReceived(ses, msg);
                     }
                     else {
                         connCtx.handler().registerRequest(reqId, cmdType);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
index ef26d2a..7d66e00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.QueryRetryException;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -34,6 +35,9 @@ public class GridQueryFailResponse implements Message {
     /** Cancelled by originator failure type. */
     public static final byte CANCELLED_BY_ORIGINATOR = 1;
 
+    /** Execution error. Query should be retried. */
+    public static final byte RETRY_QUERY = 2;
+
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -60,7 +64,13 @@ public class GridQueryFailResponse implements Message {
     public GridQueryFailResponse(long qryReqId, Throwable err) {
         this.qryReqId = qryReqId;
         this.errMsg = err.getMessage();
-        this.failCode = err instanceof QueryCancelledException ? CANCELLED_BY_ORIGINATOR : GENERAL_ERROR;
+
+        if (err instanceof QueryCancelledException)
+            this.failCode = CANCELLED_BY_ORIGINATOR;
+        else if (err instanceof QueryRetryException)
+            this.failCode = RETRY_QUERY;
+        else
+            this.failCode = GENERAL_ERROR;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 8fb0a70..c683827 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -4096,13 +4096,14 @@ public abstract class IgniteUtils {
      * @param log Logger to log possible checked exception with (optional).
      */
     public static void close(@Nullable AutoCloseable rsrc, @Nullable IgniteLogger log) {
-        if (rsrc != null)
+        if (rsrc != null) {
             try {
                 rsrc.close();
             }
             catch (Exception e) {
-                warn(log, "Failed to close resource: " + e.getMessage());
+                warn(log, "Failed to close resource: " + e.getMessage(), e);
             }
+        }
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
index 18a842b..db67edf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
@@ -407,6 +407,9 @@ public class ConnectionManager {
         if (connCleanupTask != null)
             connCleanupTask.close();
 
+        // Needs to be released before SHUTDOWN.
+        closeConnections();
+
         try (Connection c = connectionNoCache(QueryUtils.SCHEMA_INFORMATION); Statement s = c.createStatement()) {
             s.execute("SHUTDOWN");
         }
@@ -419,8 +422,6 @@ public class ConnectionManager {
 
             sysConn = null;
         }
-
-        closeConnections();
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
index 9803b5d..074d62a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
@@ -126,9 +126,8 @@ public class H2ConnectionWrapper implements AutoCloseable {
         return S.toString(H2ConnectionWrapper.class, this);
     }
 
-    /** Closes wrapped connection */
-    @Override
-    public void close() {
+    /** Closes wrapped connection. */
+    @Override public void close() {
         U.closeQuiet(conn);
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
index 64647cb..cdafb65 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
@@ -49,6 +49,8 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> {
         throws IgniteCheckedException {
         super(data, forUpdate);
 
+        assert detachedConn != null;
+
         this.mvccTracker = mvccTracker;
         this.detachedConn = detachedConn;
     }
@@ -68,8 +70,7 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> {
             super.onClose();
         }
         finally {
-            if (detachedConn != null)
-                detachedConn.recycle();
+            detachedConn.recycle();
 
             if (mvccTracker != null)
                 mvccTracker.onDone();
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index 4a73a1b..a9abc2b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -357,10 +357,26 @@ public class H2Utils {
      * @param enforceJoinOrder Enforce join order of tables.
      */
     public static void setupConnection(Connection conn, boolean distributedJoins, boolean enforceJoinOrder) {
+        setupConnection(conn,distributedJoins, enforceJoinOrder, false);
+    }
+
+    /**
+     * @param conn Connection to use.
+     * @param distributedJoins If distributed joins are enabled.
+     * @param enforceJoinOrder Enforce join order of tables.
+     * @param lazy Lazy query execution mode.
+     */
+    public static void setupConnection(
+        Connection conn,
+        boolean distributedJoins,
+        boolean enforceJoinOrder,
+        boolean lazy
+    ) {
         Session s = session(conn);
 
         s.setForceJoinOrder(enforceJoinOrder);
         s.setJoinBatchEnabled(distributedJoins);
+        s.setLazyQueryExecution(lazy);
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 0a60018..ea7c3a2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -119,7 +119,6 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
-import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
 import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
@@ -869,31 +868,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
         int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
-        final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker();
-
-        if (cancel != null) {
-            cancel.set(new Runnable() {
-                @Override public void run() {
-                    if (lazyWorker != null) {
-                        lazyWorker.submit(new Runnable() {
-                            @Override public void run() {
-                                cancelStatement(stmt);
-                            }
-                        });
-                    }
-                    else
-                        cancelStatement(stmt);
-                }
-            });
-        }
+        if (cancel != null)
+            cancel.set(() -> cancelStatement(stmt));
 
         Session ses = session(conn);
 
         if (timeoutMillis > 0)
             ses.setQueryTimeout(timeoutMillis);
-
-        if (lazyWorker != null)
-            ses.setLazyQueryExecution(true);
+        else
+            ses.setQueryTimeout(0);
 
         try {
             return stmt.executeQuery();
@@ -905,13 +888,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             throw new IgniteCheckedException("Failed to execute SQL query. " + e.getMessage(), e);
         }
-        finally {
-            if (timeoutMillis > 0)
-                ses.setQueryTimeout(0);
-
-            if (lazyWorker != null)
-                ses.setLazyQueryExecution(false);
-        }
     }
 
     /**
@@ -991,20 +967,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             long time = U.currentTimeMillis() - start;
 
-            long longQryExecTimeout = ctx.config().getLongQueryWarningTimeout();
-
-            if (time > longQryExecTimeout) {
-                ResultSet plan = executeSqlQuery(conn, preparedStatementWithParams(conn, "EXPLAIN " + sql,
-                    params, false), 0, null);
+            if (time > ctx.config().getLongQueryWarningTimeout()) {
+                // In lazy mode we have to use separate connection to gather plan to print warning.
+                // Otherwise the all tables are unlocked by this query.
+                try (Connection planConn = connMgr.connectionNoCache(conn.getSchema())) {
+                    ResultSet plan = executeSqlQuery(planConn, preparedStatementWithParams(planConn, "EXPLAIN " + sql,
+                        params, false), 0, null);
 
-                plan.next();
+                    plan.next();
 
-                // Add SQL explain result message into log.
-                String msg = "Query execution is too long [time=" + time + " ms, sql='" + sql + '\'' +
-                    ", plan=" + U.nl() + plan.getString(1) + U.nl() + ", parameters=" +
-                    (params == null ? "[]" : Arrays.deepToString(params.toArray())) + "]";
+                    // Add SQL explain result message into log.
+                    String msg = "Query execution is too long [time=" + time + " ms, sql='" + sql + '\'' +
+                        ", plan=" + U.nl() + plan.getString(1) + U.nl() + ", parameters=" +
+                        (params == null ? "[]" : Arrays.deepToString(params.toArray())) + "]";
 
-                LT.warn(log, msg);
+                    LT.warn(log, msg);
+                }
             }
 
             return rs;
@@ -2131,7 +2109,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Stopping cache query index...");
 
-        mapQryExec.cancelLazyWorkers();
+        mapQryExec.stop();
 
         qryCtxRegistry.clearSharedOnLocalNodeStop();
 
@@ -2285,8 +2263,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public void onKernalStop() {
-        mapQryExec.cancelLazyWorkers();
-
         connMgr.onKernalStop();
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 173e4a4..e04fab1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -24,11 +24,13 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.cache.query.QueryRetryException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -40,7 +42,6 @@ import org.apache.ignite.internal.processors.query.h2.H2TableDescriptor;
 import org.apache.ignite.internal.processors.query.h2.IndexRebuildPartialClosure;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndexBase;
-import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.command.dml.Insert;
@@ -56,6 +57,7 @@ import org.h2.result.SortOrder;
 import org.h2.schema.SchemaObject;
 import org.h2.table.Column;
 import org.h2.table.IndexColumn;
+import org.h2.table.Table;
 import org.h2.table.TableBase;
 import org.h2.table.TableType;
 import org.h2.value.DataType;
@@ -70,6 +72,9 @@ public class GridH2Table extends TableBase {
     /** Insert hack flag. */
     private static final ThreadLocal<Boolean> INSERT_HACK = new ThreadLocal<>();
 
+    /** Exclusive lock constant. */
+    private static final long EXCLUSIVE_LOCK = -1;
+
     /** Cache context info. */
     private final GridCacheContextInfo cacheInfo;
 
@@ -92,10 +97,14 @@ public class GridH2Table extends TableBase {
     private final ReentrantReadWriteLock lock;
 
     /** */
-    private boolean destroyed;
+    private volatile boolean destroyed;
 
-    /** */
-    private final ConcurrentMap<Session, Boolean> sessions = new ConcurrentHashMap<>();
+    /**
+     * Map of sessions locks.
+     * Session -> EXCLUSIVE_LOCK (-1L) - for exclusive locks.
+     * Session -> (table version) - for shared locks.
+     */
+    private final ConcurrentMap<Session, SessionLock> sessions = new ConcurrentHashMap<>();
 
     /** */
     private final IndexColumn affKeyCol;
@@ -121,6 +130,9 @@ public class GridH2Table extends TableBase {
     /** Columns with thread-safe access. */
     private volatile Column[] safeColumns;
 
+    /** Table version. The version is changed when exclusive lock is acquired (DDL operation is started). */
+    private final AtomicLong ver = new AtomicLong();
+
     /**
      * Creates table.
      *
@@ -350,10 +362,17 @@ public class GridH2Table extends TableBase {
     /** {@inheritDoc} */
     @Override public boolean lock(Session ses, boolean exclusive, boolean force) {
         // In accordance with base method semantics, we'll return true if we were already exclusively locked.
-        Boolean res = sessions.get(ses);
+        SessionLock sesLock = sessions.get(ses);
+
+        if (sesLock != null) {
+            if (sesLock.isExclusive())
+                return true;
+
+            if (ver.get() != sesLock.version())
+                throw new QueryRetryException(getName());
 
-        if (res != null)
-            return res;
+            return false;
+        }
 
         // Acquire the lock.
         lock(exclusive);
@@ -365,25 +384,53 @@ public class GridH2Table extends TableBase {
         }
 
         // Mutate state.
-        sessions.put(ses, exclusive);
+        sessions.put(ses, exclusive ? SessionLock.exclusiveLock() : SessionLock.sharedLock(ver.longValue()));
 
         ses.addLock(this);
 
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Override public void unlock(Session ses) {
+        SessionLock sesLock = sessions.remove(ses);
+
+        if (sesLock.locked)
+            unlock(sesLock.isExclusive());
+    }
+
     /**
-     * @return Table identifier.
+     * @param ses H2 session.
      */
-    public QueryTable identifier() {
-        return identifier;
+    private void readLockInternal(Session ses) {
+        SessionLock sesLock = sessions.get(ses);
+
+        assert sesLock != null && !sesLock.isExclusive()
+            : "Invalid table lock [name=" + getName() + ", lock=" + sesLock.ver + ']';
+
+        if (!sesLock.locked) {
+            lock(false);
+
+            sesLock.locked = true;
+        }
     }
 
     /**
-     * @return Table identifier as string.
+     * Release table lock.
+     *
+     * @param ses H2 session.
      */
-    public String identifierString() {
-        return identifierStr;
+    private void unlockReadInternal(Session ses) {
+        SessionLock sesLock = sessions.get(ses);
+
+        assert sesLock != null && !sesLock.isExclusive()
+            : "Invalid table unlock [name=" + getName() + ", lock=" + sesLock.ver + ']';
+
+        if (sesLock.locked) {
+            sesLock.locked = false;
+
+            unlock(false);
+        }
     }
 
     /**
@@ -396,7 +443,7 @@ public class GridH2Table extends TableBase {
         Lock l = exclusive ? lock.writeLock() : lock.readLock();
 
         try {
-            if (!exclusive || !GridMapQueryExecutor.FORCE_LAZY)
+            if (!exclusive)
                 l.lockInterruptibly();
             else {
                 for (;;) {
@@ -405,6 +452,8 @@ public class GridH2Table extends TableBase {
                     else
                         Thread.yield();
                 }
+
+                ver.incrementAndGet();
             }
         }
         catch (InterruptedException e) {
@@ -426,6 +475,33 @@ public class GridH2Table extends TableBase {
     }
 
     /**
+     * @param ses H2 session.
+     */
+    private void checkVersion(Session ses) {
+        SessionLock sesLock = sessions.get(ses);
+
+        assert sesLock != null && !sesLock.isExclusive()
+            : "Invalid table check version  [name=" + getName() + ", lock=" + sesLock.ver + ']';
+
+        if (ver.longValue() != sesLock.version())
+            throw new QueryRetryException(getName());
+    }
+
+    /**
+     * @return Table identifier.
+     */
+    public QueryTable identifier() {
+        return identifier;
+    }
+
+    /**
+     * @return Table identifier as string.
+     */
+    public String identifierString() {
+        return identifierStr;
+    }
+
+    /**
      * Check if table is not destroyed.
      */
     private void ensureNotDestroyed() {
@@ -485,8 +561,6 @@ public class GridH2Table extends TableBase {
         try {
             ensureNotDestroyed();
 
-            assert sessions.isEmpty() : sessions;
-
             destroyed = true;
 
             for (int i = 1, len = idxs.size(); i < len; i++)
@@ -507,16 +581,6 @@ public class GridH2Table extends TableBase {
         this.rmIndex = rmIndex;
     }
 
-    /** {@inheritDoc} */
-    @Override public void unlock(Session ses) {
-        Boolean exclusive = sessions.remove(ses);
-
-        if (exclusive == null)
-            return;
-
-        unlock(exclusive);
-    }
-
     /**
      * Gets index by index.
      *
@@ -1213,4 +1277,84 @@ public class GridH2Table extends TableBase {
 
         return true;
     }
+
+    /**
+     * @param s H2 session.
+     */
+    public static void unlockTables(Session s) {
+        for (Table t : s.getLocks()) {
+            if (t instanceof GridH2Table)
+                ((GridH2Table)t).unlockReadInternal(s);
+        }
+    }
+
+    /**
+     * @param s H2 session.
+     */
+    public static void readLockTables(Session s) {
+        for (Table t : s.getLocks()) {
+            if (t instanceof GridH2Table)
+                ((GridH2Table)t).readLockInternal(s);
+        }
+    }
+
+    /**
+     * @param s H2 session.
+     */
+    public static void checkTablesVersions(Session s) {
+        for (Table t : s.getLocks()) {
+            if (t instanceof GridH2Table)
+                ((GridH2Table)t).checkVersion(s);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class SessionLock {
+        /** Version. */
+        final long ver;
+
+        /** Locked by current thread flag. */
+        boolean locked;
+
+        /**
+         * Constructor for shared lock.
+         *
+         * @param ver Table version.
+         */
+        private SessionLock(long ver) {
+            this.ver = ver;
+            locked = true;
+        }
+
+        /**
+         * @param ver Locked table version.
+         * @return Shared lock instance.
+         */
+        static SessionLock sharedLock(long ver) {
+            return new SessionLock(ver);
+        }
+
+        /**
+         * @return Exclusive lock instance.
+         */
+        static SessionLock exclusiveLock() {
+            return new SessionLock(EXCLUSIVE_LOCK);
+        }
+
+        /**
+         * @return {@code true} if exclusive lock.
+         */
+        boolean isExclusive() {
+            return ver == EXCLUSIVE_LOCK;
+        }
+
+        /**
+         * @return Table version of the first lock.
+         */
+        long version() {
+            return ver;
+        }
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
index ca0c940..a697bf3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
-import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
 import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -44,9 +43,6 @@ public class QueryContext {
     /** */
     private final PartitionReservation reservations;
 
-    /** */
-    private MapQueryLazyWorker lazyWorker;
-
     /**
      * Constructor.
      *
@@ -110,23 +106,6 @@ public class QueryContext {
         return filter;
     }
 
-    /**
-     * @return Lazy worker, if any, or {@code null} if none.
-     */
-    public MapQueryLazyWorker lazyWorker() {
-        return lazyWorker;
-    }
-
-    /**
-     * @param lazyWorker Lazy worker, if any, or {@code null} if none.
-     * @return {@code this}.
-     */
-    public QueryContext lazyWorker(MapQueryLazyWorker lazyWorker) {
-        this.lazyWorker = lazyWorker;
-
-        return this;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(QueryContext.class, this);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java
index 8275d6e..34acbbf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java
@@ -58,8 +58,6 @@ public class QueryContextRegistry {
      * Drops current thread local context.
      */
     public void clearThreadLocal() {
-        assert locCtx.get() != null;
-
         locCtx.remove();
     }
 
@@ -140,10 +138,7 @@ public class QueryContextRegistry {
         if (ctx == null)
             return false;
 
-        if (ctx.lazyWorker() != null)
-            ctx.lazyWorker().stop(nodeStop);
-        else
-            ctx.clearContext(nodeStop);
+        ctx.clearContext(nodeStop);
 
         return true;
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index e6bb564..e132b1a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -32,14 +31,13 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.QueryRetryException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
@@ -63,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
@@ -88,16 +87,15 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.apache.ignite.thread.IgniteThread;
+import org.h2.api.ErrorCode;
 import org.h2.command.Prepared;
 import org.h2.jdbc.JdbcResultSet;
+import org.h2.jdbc.JdbcSQLException;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_FORCE_LAZY_RESULT_SET;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
-
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
 
@@ -107,9 +105,6 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2V
 @SuppressWarnings("ForLoopReplaceableByForEach")
 public class GridMapQueryExecutor {
     /** */
-    public static final boolean FORCE_LAZY = IgniteSystemProperties.getBoolean(IGNITE_SQL_FORCE_LAZY_RESULT_SET);
-
-    /** */
     private IgniteLogger log;
 
     /** */
@@ -127,15 +122,6 @@ public class GridMapQueryExecutor {
     /** */
     private final GridSpinBusyLock busyLock;
 
-    /** Lazy workers. */
-    private final ConcurrentHashMap<MapQueryLazyWorkerKey, MapQueryLazyWorker> lazyWorkers = new ConcurrentHashMap<>();
-
-    /** Busy lock for lazy workers. */
-    private final GridSpinBusyLock lazyWorkerBusyLock = new GridSpinBusyLock();
-
-    /** Lazy worker stop guard. */
-    private final AtomicBoolean lazyWorkerStopGuard = new AtomicBoolean();
-
     /**
      * @param busyLock Busy lock.
      */
@@ -191,18 +177,11 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * Cancel active lazy queries and prevent submit of new queries.
+     * Stop query map executor, cleanup resources.
      */
-    public void cancelLazyWorkers() {
-        if (!lazyWorkerStopGuard.compareAndSet(false, true))
-            return;
-
-        lazyWorkerBusyLock.block();
-
-        for (MapQueryLazyWorker worker : lazyWorkers.values())
-            worker.stop(false);
-
-        lazyWorkers.clear();
+    public void stop() {
+        for (MapNodeResults res : qryRess.values())
+            res.cancelAll();
     }
 
     /**
@@ -240,13 +219,6 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * @return Busy lock for lazy workers to guard their operations with.
-     */
-    GridSpinBusyLock busyLock() {
-        return busyLock;
-    }
-
-    /**
      * @param node Node.
      * @param msg Message.
      */
@@ -288,6 +260,7 @@ public class GridMapQueryExecutor {
     /**
      * @param node Node.
      * @param req Query request.
+     * @throws IgniteCheckedException On error.
      */
     private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
         int[] qryParts = req.queryPartitions();
@@ -296,11 +269,15 @@ public class GridMapQueryExecutor {
 
         final int[] parts = qryParts == null ? partsMap == null ? null : partsMap.get(ctx.localNodeId()) : qryParts;
 
+        final GridDhtTxLocalAdapter tx;
+
+        GridH2SelectForUpdateTxDetails txReq = req.txDetails();
+
         boolean distributedJoins = req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS);
         boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
         boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
         boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
-        boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
+        final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY) && txReq == null;
 
         Boolean dataPageScanEnabled = req.isDataPageScanEnabled();
 
@@ -311,10 +288,6 @@ public class GridMapQueryExecutor {
 
         final Object[] params = req.parameters();
 
-        final GridDhtTxLocalAdapter tx;
-
-        GridH2SelectForUpdateTxDetails txReq = req.txDetails();
-
         try {
             if (txReq != null) {
                 // Prepare to run queries.
@@ -380,62 +353,36 @@ public class GridMapQueryExecutor {
 
             final int segment = i;
 
-            if (lazy) {
-                onQueryRequest0(node,
-                    req.requestId(),
-                    segment,
-                    req.schemaName(),
-                    req.queries(),
-                    cacheIds,
-                    req.topologyVersion(),
-                    partsMap,
-                    parts,
-                    req.pageSize(),
-                    distributedJoins,
-                    enforceJoinOrder,
-                    false, // Replicated is always false here (see condition above).
-                    req.timeout(),
-                    params,
-                    true,
-                    req.mvccSnapshot(),
-                    tx,
-                    txReq,
-                    lockFut,
-                    runCntr,
-                    dataPageScanEnabled);
-            }
-            else {
-                ctx.closure().callLocal(
-                    new Callable<Void>() {
-                        @Override public Void call() {
-                            onQueryRequest0(node,
-                                req.requestId(),
-                                segment,
-                                req.schemaName(),
-                                req.queries(),
-                                cacheIds,
-                                req.topologyVersion(),
-                                partsMap,
-                                parts,
-                                req.pageSize(),
-                                distributedJoins,
-                                enforceJoinOrder,
-                                false,
-                                req.timeout(),
-                                params,
-                                false,
-                                req.mvccSnapshot(),
-                                tx,
-                                txReq,
-                                lockFut,
-                                runCntr,
-                                dataPageScanEnabled);
-
-                            return null;
-                        }
+            ctx.closure().callLocal(
+                new Callable<Void>() {
+                    @Override public Void call() {
+                        onQueryRequest0(node,
+                            req.requestId(),
+                            segment,
+                            req.schemaName(),
+                            req.queries(),
+                            cacheIds,
+                            req.topologyVersion(),
+                            partsMap,
+                            parts,
+                            req.pageSize(),
+                            distributedJoins,
+                            enforceJoinOrder,
+                            false,
+                            req.timeout(),
+                            params,
+                            lazy,
+                            req.mvccSnapshot(),
+                            tx,
+                            txReq,
+                            lockFut,
+                            runCntr,
+                            dataPageScanEnabled);
+
+                        return null;
                     }
-                    , QUERY_POOL);
-            }
+                },
+                QUERY_POOL);
         }
 
         onQueryRequest0(node,
@@ -474,6 +421,10 @@ public class GridMapQueryExecutor {
      * @param parts Explicit partitions for current node.
      * @param pageSize Page size.
      * @param distributedJoins Query distributed join mode.
+     * @param enforceJoinOrder Enforce join order H2 flag.
+     * @param replicated Replicated only flag.
+     * @param timeout Query timeout.
+     * @param params Query parameters.
      * @param lazy Streaming flag.
      * @param mvccSnapshot MVCC snapshot.
      * @param tx Transaction.
@@ -504,78 +455,20 @@ public class GridMapQueryExecutor {
         @Nullable final GridH2SelectForUpdateTxDetails txDetails,
         @Nullable final CompoundLockFuture lockFut,
         @Nullable final AtomicInteger runCntr,
-        Boolean dataPageScanEnabled
-    ) {
-        MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
-
+        Boolean dataPageScanEnabled) {
         // In presence of TX, we also must always have matching details.
         assert tx == null || txDetails != null;
 
-        boolean inTx = (tx != null);
-
-        if (lazy && worker == null) {
-            // Lazy queries must be re-submitted to dedicated workers.
-            MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
-            worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this, qryCtxRegistry);
+        assert !lazy || txDetails == null : "Lazy execution of SELECT FOR UPDATE queries is not supported.";
 
-            worker.submit(new Runnable() {
-                @Override public void run() {
-                    onQueryRequest0(
-                        node,
-                        reqId,
-                        segmentId,
-                        schemaName,
-                        qrys,
-                        cacheIds,
-                        topVer,
-                        partsMap,
-                        parts,
-                        pageSize,
-                        distributedJoins,
-                        enforceJoinOrder,
-                        replicated,
-                        timeout,
-                        params,
-                        true,
-                        mvccSnapshot,
-                        tx,
-                        txDetails,
-                        lockFut,
-                        runCntr,
-                        dataPageScanEnabled);
-                }
-            });
-
-            if (lazyWorkerBusyLock.enterBusy()) {
-                try {
-                    MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker);
-
-                    if (oldWorker != null)
-                        oldWorker.stop(false);
-
-                    IgniteThread thread = new IgniteThread(worker);
-
-                    thread.start();
-                }
-                finally {
-                    lazyWorkerBusyLock.leaveBusy();
-                }
-            }
-            else
-                log.info("Ignored query request (node is stopping) [nodeId=" + node.id() + ", reqId=" + reqId + ']');
-
-            return;
-        }
-
-        if (lazy && txDetails != null)
-            throw new IgniteSQLException("Lazy execution of SELECT FOR UPDATE queries is not supported.");
+        boolean inTx = (tx != null);
 
         // Prepare to run queries.
         GridCacheContext<?, ?> mainCctx = mainCacheContext(cacheIds);
 
         MapNodeResults nodeRess = resultsForNode(node.id());
 
-        MapQueryResults qr = null;
+        MapQueryResults qryResults = null;
 
         PartitionReservation reserved = null;
 
@@ -593,21 +486,12 @@ public class GridMapQueryExecutor {
                 );
 
                 if (reserved.failed()) {
-                    // Unregister lazy worker because re-try may never reach this node again.
-                    if (lazy)
-                        stopAndUnregisterCurrentLazyWorker();
-
                     sendRetry(node, reqId, segmentId, reserved.error());
 
                     return;
                 }
             }
 
-            qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, MapQueryLazyWorker.currentWorker(), inTx);
-
-            if (nodeRess.put(reqId, segmentId, qr) != null)
-                throw new IllegalStateException();
-
             // Prepare query context.
             DistributedJoinContext distributedJoinCtx = null;
 
@@ -630,47 +514,61 @@ public class GridMapQueryExecutor {
                 reserved
             );
 
-            qctx.lazyWorker(worker);
+            qryResults = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, inTx, lazy, qctx);
 
-            Connection conn = h2.connections().connectionForThread().connection(schemaName);
-
-            H2Utils.setupConnection(conn, distributedJoins, enforceJoinOrder);
+            // qctx is set, we have to release reservations inside of it.
+            reserved = null;
 
             qryCtxRegistry.setThreadLocal(qctx);
 
             if (distributedJoinCtx != null)
                 qryCtxRegistry.setShared(node.id(), reqId, qctx);
 
-            // qctx is set, we have to release reservations inside of it.
-            reserved = null;
+            if (nodeRess.put(reqId, segmentId, qryResults) != null)
+                throw new IllegalStateException();
 
-            try {
-                if (nodeRess.cancelled(reqId)) {
-                    qryCtxRegistry.clearShared(node.id(), reqId);
+            if (nodeRess.cancelled(reqId)) {
+                qryCtxRegistry.clearShared(node.id(), reqId);
 
-                    nodeRess.cancelRequest(reqId);
+                nodeRess.cancelRequest(reqId);
 
-                    throw new QueryCancelledException();
-                }
+                throw new QueryCancelledException();
+            }
 
-                // Run queries.
-                int qryIdx = 0;
+            // Run queries.
+            int qryIdx = 0;
 
-                boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+            boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
 
-                for (GridCacheSqlQuery qry : qrys) {
-                    ResultSet rs = null;
+            for (GridCacheSqlQuery qry : qrys) {
+                H2ConnectionWrapper connWrp = h2.connections().connectionForThread();
+
+                H2Utils.setupConnection(
+                    connWrp.connection(schemaName),
+                    distributedJoins,
+                    enforceJoinOrder,
+                    lazy
+                );
+
+                MapQueryResult res = new MapQueryResult(h2, mainCctx, node.id(), qry, params, connWrp, log);
+
+                qryResults.addResult(qryIdx, res);
+
+                try {
+                    res.lock();
 
                     boolean removeMapping = false;
+                    ResultSet rs = null;
 
                     // If we are not the target node for this replicated query, just ignore it.
                     if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
-                        String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params));
+                        String sql = qry.query();
+                        Collection<Object> params0 = F.asList(qry.parameters(params));
 
                         PreparedStatement stmt;
 
                         try {
-                            stmt = h2.connections().prepareStatement(conn, sql);
+                            stmt = h2.connections().prepareStatement(connWrp.connection(), sql);
                         }
                         catch (SQLException e) {
                             throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e);
@@ -680,14 +578,21 @@ public class GridMapQueryExecutor {
 
                         if (GridSqlQueryParser.isForUpdateQuery(p)) {
                             sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx);
-                            stmt = h2.connections().prepareStatement(conn, sql);
+                            stmt = h2.connections().prepareStatement(connWrp.connection(), sql);
                         }
 
                         H2Utils.bindParameters(stmt, params0);
 
                         int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx);
 
-                        rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qr.queryCancel(qryIdx), dataPageScanEnabled);
+                        rs = h2.executeSqlQueryWithTimer(
+                            stmt,
+                            connWrp.connection(),
+                            sql,
+                            params0,
+                            opTimeout,
+                            qryResults.queryCancel(qryIdx),
+                            dataPageScanEnabled);
 
                         if (inTx) {
                             ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future(
@@ -733,13 +638,10 @@ public class GridMapQueryExecutor {
                         assert rs instanceof JdbcResultSet : rs.getClass();
                     }
 
-                    qr.addResult(qryIdx, qry, node.id(), rs, params);
-
-                    if (qr.cancelled()) {
-                        qr.result(qryIdx).close();
+                    res.openResult(rs);
 
+                    if (qryResults.cancelled())
                         throw new QueryCancelledException();
-                    }
 
                     if (inTx) {
                         if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) {
@@ -748,12 +650,21 @@ public class GridMapQueryExecutor {
                         }
                     }
 
+                    final GridQueryNextPageResponse msg = prepareNextPage(
+                        nodeRess,
+                        node,
+                        qryResults,
+                        qryIdx,
+                        segmentId,
+                        pageSize,
+                        removeMapping,
+                        dataPageScanEnabled
+                    );
+
                     // Send the first page.
                     if (lockFut == null)
-                        sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping, dataPageScanEnabled);
+                        sendNextPage(node, msg);
                     else {
-                        GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping, dataPageScanEnabled);
-
                         if (msg != null) {
                             lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
                                 @Override public void apply(IgniteInternalFuture<Void> future) {
@@ -773,50 +684,68 @@ public class GridMapQueryExecutor {
 
                     qryIdx++;
                 }
+                finally {
+                    try {
+                        res.unlockTables();
+                    }
+                    finally {
+                        res.unlock();
+                    }
+                }
+            } // for map queries
 
-                // All request results are in the memory in result set already, so it's ok to release partitions.
-                if (!lazy)
-                    releaseReservations();
-            }
-            catch (Throwable e){
-                releaseReservations();
-
-                throw e;
-            }
+            if (!lazy)
+                qryResults.releaseQueryContext();
         }
         catch (Throwable e) {
-            if (qr != null) {
-                nodeRess.remove(reqId, segmentId, qr);
+            if (qryResults != null) {
+                nodeRess.remove(reqId, segmentId, qryResults);
 
-                qr.cancel(false);
+                qryResults.close();
             }
+            else
+                releaseReservations();
 
-            // Unregister worker after possible cancellation.
-            if (lazy)
-                stopAndUnregisterCurrentLazyWorker();
+            if (e instanceof QueryCancelledException)
+                sendError(node, reqId, e);
+            else {
+                JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
 
-            GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
+                if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+                    sendQueryCancel(node, reqId);
+                else {
+                    GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
 
-            if (retryErr != null) {
-                final String retryCause = String.format(
-                    "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
-                    "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
-                );
+                    if (retryErr != null) {
+                        final String retryCause = String.format(
+                            "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
+                                "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
+                        );
 
-                sendRetry(node, reqId, segmentId, retryCause);
-            }
-            else {
-                U.error(log, "Failed to execute local query.", e);
+                        sendRetry(node, reqId, segmentId, retryCause);
+                    }
+                    else {
+                        QueryRetryException qryRetryErr = X.cause(e, QueryRetryException.class);
 
-                sendError(node, reqId, e);
+                        if (qryRetryErr != null)
+                            sendError(node, reqId, qryRetryErr);
+                        else {
+                            U.error(log, "Failed to execute local query.", e);
+
+                            sendError(node, reqId, e);
 
-                if (e instanceof Error)
-                    throw (Error)e;
+                            if (e instanceof Error)
+                                throw (Error)e;
+                        }
+                    }
+                }
             }
         }
         finally {
             if (reserved != null)
                 reserved.release();
+
+            qryCtxRegistry.clearThreadLocal();
         }
     }
 
@@ -947,6 +876,14 @@ public class GridMapQueryExecutor {
     /**
      * @param node Node.
      * @param qryReqId Query request ID.
+     */
+    private void sendQueryCancel(ClusterNode node, long qryReqId) {
+        sendError(node, qryReqId, new QueryCancelledException());
+    }
+
+    /**
+     * @param node Node.
+     * @param qryReqId Query request ID.
      * @param err Error.
      */
     private void sendError(ClusterNode node, long qryReqId, Throwable err) {
@@ -1003,40 +940,91 @@ public class GridMapQueryExecutor {
      * @param req Request.
      */
     private void onNextPageRequest(final ClusterNode node, final GridQueryNextPageRequest req) {
+        long reqId = req.queryRequestId();
+
         final MapNodeResults nodeRess = qryRess.get(node.id());
 
         if (nodeRess == null) {
-            sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req));
+            sendError(node, reqId, new CacheException("No node result found for request: " + req));
 
             return;
         }
-        else if (nodeRess.cancelled(req.queryRequestId())) {
-            sendError(node, req.queryRequestId(), new QueryCancelledException());
+        else if (nodeRess.cancelled(reqId)) {
+            sendQueryCancel(node, reqId);
 
             return;
         }
 
-        final MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
+        final MapQueryResults qryResults = nodeRess.get(reqId, req.segmentId());
 
-        if (qr == null)
-            sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
-        else if (qr.cancelled())
-            sendError(node, req.queryRequestId(), new QueryCancelledException());
-        else {
-            Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags());
+        if (qryResults == null)
+            sendError(node, reqId, new CacheException("No query result found for request: " + req));
+        else if (qryResults.cancelled())
+            sendQueryCancel(node, reqId);
+        else
+            try {
+                QueryContext qctxReduce = qryCtxRegistry.getThreadLocal();
+
+                if (qctxReduce != null)
+                    qryCtxRegistry.clearThreadLocal();
+
+                qryCtxRegistry.setThreadLocal(qryResults.queryContext());
+
+                MapQueryResult res = qryResults.result(req.query());
+
+                assert res != null;
+
+                try {
+                    // Session isn't set for lazy=false queries.
+                    // Also session == null when result already closed.
+                    res.lock();
+                    res.lockTables();
+                    res.checkTablesVersions();
+
+                    Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags());
 
-            MapQueryLazyWorker lazyWorker = qr.lazyWorker();
+                    GridQueryNextPageResponse msg = prepareNextPage(
+                        nodeRess,
+                        node,
+                        qryResults,
+                        req.query(),
+                        req.segmentId(),
+                        req.pageSize(),
+                        false,
+                        dataPageScanEnabled);
 
-            if (lazyWorker != null) {
-                lazyWorker.submit(new Runnable() {
-                    @Override public void run() {
-                        sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false, dataPageScanEnabled);
+                    sendNextPage(node, msg);
+                }
+                finally {
+                    qryCtxRegistry.clearThreadLocal();
+
+                    if (qctxReduce != null)
+                        qryCtxRegistry.setThreadLocal(qctxReduce);
+
+                    try {
+                        res.unlockTables();
+                    }
+                    finally {
+                        res.unlock();
                     }
-                });
+                }
+            }
+            catch (Exception e) {
+                QueryRetryException retryEx = X.cause(e, QueryRetryException.class);
+
+                if (retryEx != null)
+                    sendError(node, reqId, retryEx);
+                else {
+                    JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
+
+                    if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+                        sendQueryCancel(node, reqId);
+                    else
+                        sendError(node, reqId, e);
+                }
+
+                qryResults.cancel();
             }
-            else
-                sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false, dataPageScanEnabled);
-        }
     }
 
     /**
@@ -1051,8 +1039,15 @@ public class GridMapQueryExecutor {
      * @return Next page.
      * @throws IgniteCheckedException If failed.
      */
-    private GridQueryNextPageResponse prepareNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
-        int pageSize, boolean removeMapping, Boolean dataPageScanEnabled) throws IgniteCheckedException {
+    private GridQueryNextPageResponse prepareNextPage(
+        MapNodeResults nodeRess,
+        ClusterNode node,
+        MapQueryResults qr,
+        int qry,
+        int segmentId,
+        int pageSize,
+        boolean removeMapping,
+        Boolean dataPageScanEnabled) throws IgniteCheckedException {
         MapQueryResult res = qr.result(qry);
 
         assert res != null;
@@ -1067,14 +1062,14 @@ public class GridMapQueryExecutor {
         boolean last = res.fetchNextPage(rows, pageSize, dataPageScanEnabled);
 
         if (last) {
-            res.close();
+            qr.closeResult(qry);
 
             if (qr.isAllClosed()) {
                 nodeRess.remove(qr.queryRequestId(), segmentId, qr);
 
-                // Release reservations if the last page fetched, all requests are closed and this is a lazy worker.
-                if (MapQueryLazyWorker.currentWorker() != null)
-                    releaseReservations();
+                // Clear context, release reservations
+                if (qr.isLazy())
+                    qr.releaseQueryContext();
             }
         }
 
@@ -1097,21 +1092,11 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * @param nodeRess Results.
      * @param node Node.
-     * @param qr Query results.
-     * @param qry Query.
-     * @param segmentId Index segment ID.
-     * @param pageSize Page size.
-     * @param removeMapping Remove mapping flag.
-     * @param dataPageScanEnabled If data page scan is enabled.
+     * @param msg Message to send.
      */
-    private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
-        int pageSize, boolean removeMapping, Boolean dataPageScanEnabled) {
+    private void sendNextPage(ClusterNode node, GridQueryNextPageResponse msg) {
         try {
-            GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, removeMapping,
-                dataPageScanEnabled);
-
             if (msg != null) {
                 if (node.isLocal())
                     h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
@@ -1130,6 +1115,7 @@ public class GridMapQueryExecutor {
      * @param node Node.
      * @param reqId Request ID.
      * @param segmentId Index segment ID.
+     * @param retryCause Description of the retry cause.
      */
     private void sendRetry(ClusterNode node, long reqId, int segmentId, String retryCause) {
         try {
@@ -1153,34 +1139,4 @@ public class GridMapQueryExecutor {
             U.warn(log, "Failed to send retry message: " + e.getMessage());
         }
     }
-
-    /**
-     * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread).
-     */
-    public void stopAndUnregisterCurrentLazyWorker() {
-        MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
-
-        if (worker != null) {
-            worker.stop(false);
-
-            // Just stop is not enough as worker may be registered, but not started due to exception.
-            unregisterLazyWorker(worker);
-        }
-    }
-
-    /**
-     * Unregister lazy worker.
-     *
-     * @param worker Worker.
-     */
-    public void unregisterLazyWorker(MapQueryLazyWorker worker) {
-        lazyWorkers.remove(worker.key(), worker);
-    }
-
-    /**
-     * @return Number of registered lazy workers.
-     */
-    public int registeredLazyWorkers() {
-        return lazyWorkers.size();
-    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f139974..b2c5170 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -43,6 +43,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.QueryRetryException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
@@ -65,9 +66,11 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
 import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedUpdateRun;
 import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
@@ -164,6 +167,9 @@ public class GridReduceQueryExecutor {
     /** Partition mapper. */
     private ReducePartitionMapper mapper;
 
+    /** Default query timeout. */
+    private long dfltQueryTimeout;
+
     /**
      * Constructor.
      *
@@ -182,6 +188,8 @@ public class GridReduceQueryExecutor {
         this.ctx = ctx;
         this.h2 = h2;
 
+        dfltQueryTimeout = IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, DFLT_RETRY_TIMEOUT);
+
         log = ctx.log(GridReduceQueryExecutor.class);
 
         mapper = new ReducePartitionMapper(ctx, log);
@@ -282,11 +290,20 @@ public class GridReduceQueryExecutor {
      */
     private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) {
         if (r != null) {
-            CacheException e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
-                ", errMsg=" + msg + ']');
+            CacheException e;
 
-            if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR)
-                e.addSuppressed(new QueryCancelledException());
+            if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR) {
+                e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
+                    ", errMsg=" + msg + ']', new QueryCancelledException());
+            }
+            else if (failCode == GridQueryFailResponse.RETRY_QUERY) {
+                e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
+                    ", errMsg=" + msg + ']', new QueryRetryException(msg));
+            }
+            else {
+                e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
+                    ", errMsg=" + msg + ']');
+            }
 
             r.setStateOnException(nodeId, e);
         }
@@ -521,6 +538,8 @@ public class GridReduceQueryExecutor {
                 dataPageScanEnabled
             );
 
+            ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn = h2.connections().detachThreadConnection();
+
             Collection<ClusterNode> nodes;
 
             // Explicit partition mapping for unstable topology.
@@ -678,7 +697,7 @@ public class GridReduceQueryExecutor {
                 if (isReplicatedOnly)
                     flags |= GridH2QueryRequest.FLAG_REPLICATED;
 
-                if (lazy && mapQrys.size() == 1)
+                if (lazy)
                     flags |= GridH2QueryRequest.FLAG_LAZY;
 
                 flags = setDataPageScanEnabled(flags, dataPageScanEnabled);
@@ -807,7 +826,10 @@ public class GridReduceQueryExecutor {
                                 cancel,
                                 dataPageScanEnabled);
 
-                            resIter = new H2FieldsIterator(res, mvccTracker, false, null);
+                            resIter = new H2FieldsIterator(res, mvccTracker, false, detachedConn);
+
+                            // don't recycle at final block
+                            detachedConn = null;
 
                             mvccTracker = null; // To prevent callback inside finally block;
                         }
@@ -874,6 +896,9 @@ public class GridReduceQueryExecutor {
                 throw resEx;
             }
             finally {
+                if (detachedConn != null)
+                    detachedConn.recycle();
+
                 if (release) {
                     releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins(), mvccTracker);
 
@@ -1039,7 +1064,7 @@ public class GridReduceQueryExecutor {
      * @return {@code true} if exception is caused by cancel.
      */
     private boolean wasCancelled(CacheException e) {
-        return X.hasSuppressed(e, QueryCancelledException.class);
+        return X.cause(e, QueryCancelledException.class) != null;
     }
 
     /**
@@ -1049,19 +1074,22 @@ public class GridReduceQueryExecutor {
      * @param r Query run.
      * @param qryReqId Query id.
      * @param distributedJoins Distributed join flag.
+     * @param mvccTracker MVCC tracker.
      */
-    public void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId,
+    void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId,
         boolean distributedJoins, MvccQueryTracker mvccTracker) {
-        // For distributedJoins need always send cancel request to cleanup resources.
         if (distributedJoins)
             send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
-        else {
-            for (ReduceIndex idx : r.indexes()) {
-                if (!idx.fetchedAll()) {
+
+        for (ReduceIndex idx : r.indexes()) {
+            if (!idx.fetchedAll()) {
+                if (!distributedJoins) // cancel request has been already sent for distributed join.
                     send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
 
-                    break;
-                }
+                r.setStateOnException(ctx.localNodeId(),
+                    new CacheException("Query is canceled.", new QueryCancelledException()));
+
+                break;
             }
         }
 
@@ -1362,14 +1390,13 @@ public class GridReduceQueryExecutor {
      * @param qryTimeout Query timeout.
      * @return Query retry timeout.
      */
-    private static long retryTimeout(long qryTimeout) {
+    private long retryTimeout(long qryTimeout) {
         if (qryTimeout > 0)
             return qryTimeout;
 
-        return IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, DFLT_RETRY_TIMEOUT);
+        return dfltQueryTimeout;
     }
 
-
     /**
      * Prepare map query based on original sql.
      *
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
index 48116d3..2348a3d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -17,12 +17,11 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
-import java.util.concurrent.ConcurrentHashMap;
-
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
 
@@ -89,7 +88,7 @@ class MapNodeResults {
                 MapQueryResults removed = res.remove(key);
 
                 if (removed != null)
-                    removed.cancel(true);
+                    removed.cancel();
             }
         }
 
@@ -144,11 +143,10 @@ class MapNodeResults {
      */
     public void cancelAll() {
         for (MapQueryResults ress : res.values())
-            ress.cancel(true);
+            ress.cancel();
 
         // Cancel update requests
         for (GridQueryCancel upd: updCancels.values())
             upd.cancel();
     }
-
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
deleted file mode 100644
index bc1f896..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.LongAdder;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Worker for lazy query execution.
- */
-public class MapQueryLazyWorker extends GridWorker {
-    /** Lazy thread flag. */
-    private static final ThreadLocal<MapQueryLazyWorker> LAZY_WORKER = new ThreadLocal<>();
-
-    /** Active lazy worker count (for testing purposes). */
-    private static final LongAdder ACTIVE_CNT = new LongAdder();
-
-    /** Task to be executed. */
-    private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();
-
-    /** Key. */
-    private final MapQueryLazyWorkerKey key;
-
-    /** Map query executor. */
-    private final GridMapQueryExecutor exec;
-
-    /** Query context registry. */
-    private final QueryContextRegistry qryCtxRegistry;
-
-    /** Latch decremented when worker finishes. */
-    private final CountDownLatch stopLatch = new CountDownLatch(1);
-
-    /** Map query result. */
-    private volatile MapQueryResult res;
-
-    /**
-     * Constructor.
-     *
-     * @param instanceName Instance name.
-     * @param key Lazy worker key.
-     * @param log Logger.
-     * @param exec Map query executor.
-     * @param qryCtxRegistry Query context registry.
-     */
-    public MapQueryLazyWorker(@Nullable String instanceName, MapQueryLazyWorkerKey key, IgniteLogger log,
-        GridMapQueryExecutor exec, QueryContextRegistry qryCtxRegistry) {
-        super(instanceName, workerName(instanceName, key), log);
-
-        this.key = key;
-        this.exec = exec;
-        this.qryCtxRegistry = qryCtxRegistry;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        LAZY_WORKER.set(this);
-
-        ACTIVE_CNT.increment();
-
-        try {
-            while (!isCancelled()) {
-                Runnable task = tasks.take();
-
-                if (task != null) {
-                    if (!exec.busyLock().enterBusy())
-                        return;
-
-                    try {
-                        task.run();
-                    }
-                    finally {
-                        exec.busyLock().leaveBusy();
-                    }
-                }
-            }
-        }
-        finally {
-            if (res != null)
-                res.close();
-
-            LAZY_WORKER.set(null);
-
-            ACTIVE_CNT.decrement();
-
-            exec.unregisterLazyWorker(this);
-        }
-    }
-
-    /**
-     * Submit task to worker.
-     *
-     * @param task Task to be executed.
-     */
-    public void submit(Runnable task) {
-        tasks.add(task);
-    }
-
-    /**
-     * @return Worker key.
-     */
-    public MapQueryLazyWorkerKey key() {
-        return key;
-    }
-
-    /**
-     * Stop the worker.
-     * @param nodeStop Node is stopping.
-     */
-    public void stop(final boolean nodeStop) {
-        if (MapQueryLazyWorker.currentWorker() == null)
-            submit(new Runnable() {
-                @Override public void run() {
-                    stop(nodeStop);
-                }
-            });
-        else {
-            QueryContext qctx = qryCtxRegistry.getThreadLocal();
-
-            if (qctx != null) {
-                qctx.clearContext(nodeStop);
-
-                qryCtxRegistry.clearThreadLocal();
-            }
-
-            isCancelled = true;
-
-            stopLatch.countDown();
-        }
-    }
-
-    /**
-     * Await worker stop.
-     */
-    public void awaitStop() {
-        try {
-            U.await(stopLatch);
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e);
-        }
-    }
-
-    /**
-     * @param res Map query result.
-     */
-    public void result(MapQueryResult res) {
-        this.res = res;
-    }
-
-    /**
-     * @return Current worker or {@code null} if call is performed not from lazy worker thread.
-     */
-    @Nullable public static MapQueryLazyWorker currentWorker() {
-        return LAZY_WORKER.get();
-    }
-
-    /**
-     * @return Active workers count.
-     */
-    public static int activeCount() {
-        return ACTIVE_CNT.intValue();
-    }
-
-    /**
-     * Construct worker name.
-     *
-     * @param instanceName Instance name.
-     * @param key Key.
-     * @return Name.
-     */
-    private static String workerName(String instanceName, MapQueryLazyWorkerKey key) {
-        return "query-lazy-worker_" + instanceName + "_" + key.nodeId() + "_" + key.queryRequestId() + "_" +
-            key.segment();
-    }
-}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java
deleted file mode 100644
index a0f5ebb..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep;
-
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.util.UUID;
-
-/**
- * Key to identify lazy worker.
- */
-public class MapQueryLazyWorkerKey {
-    /** Client node ID. */
-    private final UUID nodeId;
-
-    /** Query request ID. */
-    private final long qryReqId;
-
-    /** Segment. */
-    private final int segment;
-
-    /**
-     * Constructor.
-     *
-     * @param nodeId Node ID.
-     * @param qryReqId Query request ID.
-     * @param segment Segment.
-     */
-    public MapQueryLazyWorkerKey(UUID nodeId, long qryReqId, int segment) {
-        this.nodeId = nodeId;
-        this.qryReqId = qryReqId;
-        this.segment = segment;
-    }
-
-    /**
-     * @return Node id.
-     */
-    public UUID nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * @return Query request ID.
-     */
-    public long queryRequestId() {
-        return qryReqId;
-    }
-
-    /**
-     * @return Segment.
-     */
-    public int segment() {
-        return segment;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int res = nodeId.hashCode();
-
-        res = 31 * res + (int)(qryReqId ^ (qryReqId >>> 32));
-        res = 31 * res + segment;
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object obj) {
-        if (obj != null && obj instanceof MapQueryLazyWorkerKey) {
-            MapQueryLazyWorkerKey other = (MapQueryLazyWorkerKey)obj;
-
-            return F.eq(qryReqId, other.qryReqId) && F.eq(nodeId, other.nodeId) && F.eq(segment, other.segment);
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(MapQueryLazyWorkerKey.class, this);
-    }
-}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index c823023..c14cf7f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -22,16 +22,23 @@ import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.engine.Session;
 import org.h2.jdbc.JdbcResultSet;
 import org.h2.result.LazyResult;
 import org.h2.result.ResultInterface;
@@ -65,12 +72,6 @@ class MapQueryResult {
     private final IgniteH2Indexing h2;
 
     /** */
-    private final ResultInterface res;
-
-    /** */
-    private final ResultSet rs;
-
-    /** */
     private final GridCacheContext<?, ?> cctx;
 
     /** */
@@ -80,13 +81,16 @@ class MapQueryResult {
     private final UUID qrySrcNodeId;
 
     /** */
-    private final int cols;
+    private volatile Result res;
 
     /** */
-    private int page;
+    private final IgniteLogger log;
+
+    /** */
+    private final Object[] params;
 
     /** */
-    private final int rowCnt;
+    private int page;
 
     /** */
     private boolean cpNeeded;
@@ -94,51 +98,40 @@ class MapQueryResult {
     /** */
     private volatile boolean closed;
 
-    /** */
-    private final Object[] params;
+    /** H2 session. */
+    private final Session ses;
 
-    /** Lazy worker. */
-    private final MapQueryLazyWorker lazyWorker;
+    /** Detached connection. Used for lazy execution to prevent connection sharing. */
+    private ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn;
+
+    /** */
+    private final ReentrantLock lock = new ReentrantLock();
 
     /**
-     * @param rs Result set.
+     * @param h2 H2 indexing.
      * @param cctx Cache context.
      * @param qrySrcNodeId Query source node.
      * @param qry Query.
      * @param params Query params.
-     * @param lazyWorker Lazy worker.
+     * @param conn H2 connection wrapper.
+     * @param log Logger.
      */
-    MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable GridCacheContext cctx,
-        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) {
+    MapQueryResult(IgniteH2Indexing h2, @Nullable GridCacheContext cctx,
+        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, H2ConnectionWrapper conn, IgniteLogger log) {
         this.h2 = h2;
         this.cctx = cctx;
         this.qry = qry;
         this.params = params;
         this.qrySrcNodeId = qrySrcNodeId;
         this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
-        this.lazyWorker = lazyWorker;
-
-        if (rs != null) {
-            this.rs = rs;
+        this.log = log;
 
-            try {
-                res = (ResultInterface)RESULT_FIELD.get(rs);
-            }
-            catch (IllegalAccessException e) {
-                throw new IllegalStateException(e); // Must not happen.
-            }
-
-            rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
-            cols = res.getVisibleColumnCount();
-        }
-        else {
-            this.rs = null;
-            this.res = null;
-            this.cols = -1;
-            this.rowCnt = -1;
+        ses = H2Utils.session(conn.connection());
+    }
 
-            closed = true;
-        }
+    /** */
+    void openResult(ResultSet rs) {
+        res = new Result(rs);
     }
 
     /**
@@ -152,14 +145,18 @@ class MapQueryResult {
      * @return Row count.
      */
     int rowCount() {
-        return rowCnt;
+        assert res != null;
+
+        return res.rowCnt;
     }
 
     /**
      * @return Column ocunt.
      */
     int columnCount() {
-        return cols;
+        assert res != null;
+
+        return res.cols;
     }
 
     /**
@@ -175,12 +172,14 @@ class MapQueryResult {
      * @param dataPageScanEnabled If data page scan is enabled.
      * @return {@code true} If there are no more rows available.
      */
-    synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize, Boolean dataPageScanEnabled) {
-        assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
+    boolean fetchNextPage(List<Value[]> rows, int pageSize, Boolean dataPageScanEnabled) {
+        assert lock.isHeldByCurrentThread();
 
         if (closed)
             return true;
 
+        assert res != null;
+
         boolean readEvt = cctx != null && cctx.name() != null && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
         page++;
@@ -189,10 +188,10 @@ class MapQueryResult {
 
         try {
             for (int i = 0; i < pageSize; i++) {
-                if (!res.next())
+                if (!res.res.next())
                     return true;
 
-                Value[] row = res.currentRow();
+                Value[] row = res.res.currentRow();
 
                 if (cpNeeded) {
                     boolean copied = false;
@@ -241,10 +240,13 @@ class MapQueryResult {
                         row(row)));
                 }
 
-                rows.add(res.currentRow());
+                rows.add(res.res.currentRow());
             }
 
-            return !res.hasNext();
+            if (detachedConn == null && res.res.hasNext())
+                detachedConn = h2.connections().detachThreadConnection();
+
+            return !res.res.hasNext();
         }
         finally {
             CacheDataTree.setDataPageScanEnabled(false);
@@ -267,31 +269,101 @@ class MapQueryResult {
     /**
      * Close the result.
      */
-    public void close() {
-        if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) {
-            lazyWorker.submit(new Runnable() {
-                @Override public void run() {
-                    close();
-                }
-            });
-
-            lazyWorker.awaitStop();
+    void close() {
+        assert lock.isHeldByCurrentThread();
 
+        if (closed)
             return;
-        }
 
-        synchronized (this) {
-            assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
+        closed = true;
+
+        if (res != null)
+            res.close();
+
+        if (detachedConn != null)
+            detachedConn.recycle();
+
+        detachedConn = null;
+    }
+
+    /** */
+    public void lock() {
+        if (!lock.isHeldByCurrentThread())
+            lock.lock();
+    }
 
-            if (closed)
-                return;
+    /** */
+    public void lockTables() {
+        if (ses.isLazyQueryExecution() && !closed)
+            GridH2Table.readLockTables(ses);
+    }
 
-            closed = true;
+    /** */
+    public void unlock() {
+        if (lock.isHeldByCurrentThread())
+            lock.unlock();
+    }
 
-            U.closeQuiet(rs);
+    /** */
+    public void unlockTables() {
+        if (ses.isLazyQueryExecution())
+            GridH2Table.unlockTables(ses);
+    }
+
+    /**
+     *
+     */
+    public void checkTablesVersions() {
+        if (ses.isLazyQueryExecution())
+            GridH2Table.checkTablesVersions(ses);
+    }
+
+    /** */
+    private class Result {
+        /** */
+        private final ResultInterface res;
+
+        /** */
+        private final ResultSet rs;
+
+        /** */
+        private final int cols;
+
+        /** */
+        private final int rowCnt;
+
+        /**
+         * Constructor.
+         *
+         * @param rs H2 result set.
+         */
+        Result(ResultSet rs) {
+            if (rs != null) {
+                this.rs = rs;
+
+                try {
+                    res = (ResultInterface)RESULT_FIELD.get(rs);
+                }
+                catch (IllegalAccessException e) {
+                    throw new IllegalStateException(e); // Must not happen.
+                }
+
+                rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
+                cols = res.getVisibleColumnCount();
+            }
+            else {
+                this.rs = null;
+                this.res = null;
+                this.cols = -1;
+                this.rowCnt = -1;
+
+                closed = true;
+            }
+        }
 
-            if (lazyWorker != null)
-                lazyWorker.stop(false);
+        /** */
+        void close() {
+            U.close(rs, log);
         }
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index acb4f4e..d61e09a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -17,20 +17,19 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import java.sql.ResultSet;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Mapper query results.
  */
 class MapQueryResults {
-    /** H@ indexing. */
+    /** H2 indexing. */
     private final IgniteH2Indexing h2;
 
     /** */
@@ -45,8 +44,8 @@ class MapQueryResults {
     /** */
     private final GridCacheContext<?, ?> cctx;
 
-    /** Lazy worker. */
-    private final MapQueryLazyWorker lazyWorker;
+    /** Lazy mode. */
+    private final boolean lazy;
 
     /** */
     private volatile boolean cancelled;
@@ -54,22 +53,28 @@ class MapQueryResults {
     /** {@code SELECT FOR UPDATE} flag. */
     private final boolean forUpdate;
 
+    /** Query context. */
+    private final QueryContext qctx;
+
     /**
      * Constructor.
+     *
      * @param h2 Indexing instance.
      * @param qryReqId Query request ID.
      * @param qrys Number of queries.
      * @param cctx Cache context.
-     * @param lazyWorker Lazy worker (if any).
      * @param forUpdate {@code SELECT FOR UPDATE} flag.
+     * @param lazy Lazy flag.
+     * @param qctx Query context.
      */
     MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, @Nullable GridCacheContext<?, ?> cctx,
-        @Nullable MapQueryLazyWorker lazyWorker, boolean forUpdate) {
+        boolean forUpdate, boolean lazy, QueryContext qctx) {
         this.forUpdate = forUpdate;
         this.h2 = h2;
         this.qryReqId = qryReqId;
         this.cctx = cctx;
-        this.lazyWorker = lazyWorker;
+        this.lazy = lazy;
+        this.qctx = qctx;
 
         results = new AtomicReferenceArray<>(qrys);
         cancels = new GridQueryCancel[qrys];
@@ -97,27 +102,12 @@ class MapQueryResults {
     }
 
     /**
-     * @return Lazy worker.
-     */
-    MapQueryLazyWorker lazyWorker() {
-        return lazyWorker;
-    }
-
-    /**
      * Add result.
-     * @param qry Query result index.
-     * @param q Query object.
-     * @param qrySrcNodeId Query source node.
-     * @param rs Result set.
-     * @param params Query arguments.
+     * @param qryIdx Query result index.
+     * @param res Result.
      */
-    void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
-        MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params, lazyWorker);
-
-        if (lazyWorker != null)
-            lazyWorker.result(res);
-
-        if (!results.compareAndSet(qry, null, res))
+    void addResult(int qryIdx, MapQueryResult res) {
+        if (!results.compareAndSet(qryIdx, null, res))
             throw new IllegalStateException();
     }
 
@@ -138,29 +128,66 @@ class MapQueryResults {
     /**
      * Cancels the query.
      */
-    void cancel(boolean forceQryCancel) {
-        if (cancelled)
-            return;
-
-        cancelled = true;
-
-        for (int i = 0; i < results.length(); i++) {
-            MapQueryResult res = results.get(i);
-
-            if (res != null) {
-                res.close();
+    void cancel() {
+        synchronized (this) {
+            if (cancelled)
+                return;
 
-                continue;
-            }
+            cancelled = true;
 
-            // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable).
-            if (forceQryCancel) {
+            for (int i = 0; i < results.length(); i++) {
                 GridQueryCancel cancel = cancels[i];
 
                 if (cancel != null)
                     cancel.cancel();
             }
         }
+
+        // The closing result set is synchronized by themselves.
+        // Include to synchronize block may be cause deadlock on <this> and MapQueryResult#lock.
+        close();
+    }
+
+    /**
+     * Wrap MapQueryResult#close to synchronize close vs cancel.
+     * We have do it because connection returns to pool after close ResultSet but the whole MapQuery
+     * (that may contains several queries) may be canceled later.
+     *
+     * @param idx Map query (result) index.
+     */
+    void closeResult(int idx) {
+        MapQueryResult res = results.get(idx);
+
+        if (res != null && !res.closed()) {
+            try {
+                // Session isn't set for lazy=false queries.
+                // Also session == null when result already closed.
+                res.lock();
+                res.lockTables();
+
+                synchronized (this) {
+                    res.close();
+
+                    // The statement of the closed result must not be canceled
+                    // because statement & connection may be reused.
+                    cancels[idx] = null;
+                }
+            }
+            finally {
+                res.unlock();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    public void close() {
+        for (int i = 0; i < results.length(); i++)
+            closeResult(i);
+
+        if (lazy)
+            releaseQueryContext();
     }
 
     /**
@@ -183,4 +210,28 @@ class MapQueryResults {
     public boolean isForUpdate() {
         return forUpdate;
     }
+
+    /**
+     * @return Query context.
+     */
+    public QueryContext queryContext() {
+        return qctx;
+    }
+
+    /**
+     * Release query context.
+     */
+    public void releaseQueryContext() {
+        h2.queryContextRegistry().clearThreadLocal();
+
+        if (qctx.distributedJoinContext() == null)
+            qctx.clearContext(false);
+    }
+
+    /**
+     * @return Lazy flag.
+     */
+    public boolean isLazy() {
+        return lazy;
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
index a3d3167..8af5b65 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java
@@ -46,21 +46,12 @@ public class PartitionReservation {
     }
 
     /**
-     * Constructor for failed reservation.
-     *
-     * @param err Error message.
-     */
-    public PartitionReservation(String err) {
-        this(null, err);
-    }
-
-    /**
      * Base constructor.
      *
      * @param reserved Reserved partitions.
      * @param err Error message.
      */
-    private PartitionReservation(@Nullable List<GridReservable> reserved, @Nullable String err) {
+    public  PartitionReservation(@Nullable List<GridReservable> reserved, @Nullable String err) {
         this.reserved = reserved;
         this.err = err;
     }
@@ -86,7 +77,9 @@ public class PartitionReservation {
         if (!releaseGuard.compareAndSet(false, true))
             return;
 
-        for (int i = 0; i < reserved.size(); i++)
-            reserved.get(i).release();
+        if (reserved != null) {
+            for (int i = 0; i < reserved.size(); i++)
+                reserved.get(i).release();
+        }
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
index b720b57..dfaef24 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
@@ -109,7 +109,8 @@ public class PartitionReservationManager {
 
             // Cache was not found, probably was not deployed yet.
             if (cctx == null) {
-                return new PartitionReservation(String.format("Failed to reserve partitions for query (cache is not " +
+                return new PartitionReservation(reserved,
+                    String.format("Failed to reserve partitions for query (cache is not " +
                     "found on local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]",
                     ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i)));
             }
@@ -125,7 +126,8 @@ public class PartitionReservationManager {
             if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits.
                 if (r != REPLICATED_RESERVABLE) {
                     if (!r.reserve())
-                        return new PartitionReservation(String.format("Failed to reserve partitions for query (group " +
+                        return new PartitionReservation(reserved,
+                            String.format("Failed to reserve partitions for query (group " +
                             "reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
                             "cacheName=%s]",ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name()));
 
@@ -144,7 +146,8 @@ public class PartitionReservationManager {
                             GridDhtPartitionState partState = part != null ? part.state() : null;
 
                             if (partState != OWNING)
-                                return new PartitionReservation(String.format("Failed to reserve partitions for " +
+                                return new PartitionReservation(reserved,
+                                        String.format("Failed to reserve partitions for " +
                                         "query (partition of REPLICATED cache is not in OWNING state) [" +
                                         "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
                                         "cacheName=%s, part=%s, partFound=%s, partState=%s]",
@@ -179,7 +182,8 @@ public class PartitionReservationManager {
                             if (partState == LOST)
                                 ignoreLostPartitionIfPossible(cctx, part);
                             else {
-                                return new PartitionReservation(String.format("Failed to reserve partitions " +
+                                return new PartitionReservation(reserved,
+                                        String.format("Failed to reserve partitions " +
                                         "for query (partition of PARTITIONED cache is not found or not in OWNING " +
                                         "state) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
                                         "cacheName=%s, part=%s, partFound=%s, partState=%s]",
@@ -197,7 +201,8 @@ public class PartitionReservationManager {
                         }
 
                         if (!part.reserve()) {
-                            return new PartitionReservation(String.format("Failed to reserve partitions for query " +
+                            return new PartitionReservation(reserved,
+                                    String.format("Failed to reserve partitions for query " +
                                     "(partition of PARTITIONED cache cannot be reserved) [" +
                                     "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
                                     "cacheName=%s, part=%s, partFound=%s, partState=%s]",
@@ -224,7 +229,8 @@ public class PartitionReservationManager {
                             if (partState == LOST)
                                 ignoreLostPartitionIfPossible(cctx, part);
                             else {
-                                return new PartitionReservation(String.format("Failed to reserve partitions for " +
+                                return new PartitionReservation(reserved,
+                                        String.format("Failed to reserve partitions for " +
                                         "query (partition of PARTITIONED cache is not in OWNING state after " +
                                         "reservation) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, " +
                                         "cacheId=%s, cacheName=%s, part=%s, partState=%s]",
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
index 4437dd6..1a25c50 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceResultPage.java
@@ -17,6 +17,15 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.RandomAccess;
+import java.util.UUID;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
@@ -26,15 +35,6 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.value.Value;
 
-import javax.cache.CacheException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.fillArray;
 
 /**
@@ -73,9 +73,11 @@ public class ReduceResultPage {
             Collection<?> plainRows = res.plainRows();
 
             if (plainRows != null) {
+                assert plainRows instanceof RandomAccess : "instance of " + plainRows.getClass();
+
                 rowsInPage = plainRows.size();
 
-                if (rowsInPage == 0 || ((ArrayList<Value[]>)plainRows).get(0).length == res.columns())
+                if (rowsInPage == 0 || ((List<Value[]>)plainRows).get(0).length == res.columns())
                     rows = (Iterator<Value[]>)plainRows.iterator();
                 else {
                     // If it's a result of SELECT FOR UPDATE (we can tell by difference in number
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/FunctionalQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/client/FunctionalQueryTest.java
index d10ed1a..42b2ccf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/FunctionalQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/FunctionalQueryTest.java
@@ -109,23 +109,37 @@ public class FunctionalQueryTest {
                 }
             }
 
-            // Fields query
-            SqlFieldsQuery qry = new SqlFieldsQuery("select id, name from Person where id >= ?")
-                .setArgs(minId)
-                .setPageSize(pageSize);
+            checkSqlFieldsQuery(cache, minId, pageSize, expSize, exp, true);
+            checkSqlFieldsQuery(cache, minId, pageSize, expSize, exp, false);
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param minId Minimal ID.
+     * @param pageSize Page size.
+     * @param expSize The size of the expected results.
+     * @param exp Expected results.
+     * @param lazy Lazy mode flag.
+     */
+    private void checkSqlFieldsQuery(ClientCache<Integer, Person> cache, int minId, int pageSize, int expSize,
+        Map<Integer, Person> exp, boolean lazy) {
+        SqlFieldsQuery qry = new SqlFieldsQuery("select id, name from Person where id >= ?")
+            .setArgs(minId)
+            .setPageSize(pageSize)
+            .setLazy(lazy);
 
-            try (QueryCursor<List<?>> cur = cache.query(qry)) {
-                List<List<?>> res = cur.getAll();
+        try (QueryCursor<List<?>> cur = cache.query(qry)) {
+            List<List<?>> res = cur.getAll();
 
-                assertEquals(expSize, res.size());
+            assertEquals(expSize, res.size());
 
-                Map<Integer, Person> act = res.stream().collect(Collectors.toMap(
-                    r -> Integer.parseInt(r.get(0).toString()),
-                    r -> new Person(Integer.parseInt(r.get(0).toString()), r.get(1).toString())
-                ));
+            Map<Integer, Person> act = res.stream().collect(Collectors.toMap(
+                r -> Integer.parseInt(r.get(0).toString()),
+                r -> new Person(Integer.parseInt(r.get(0).toString()), r.get(1).toString())
+            ));
 
-                assertEquals(exp, act);
-            }
+            assertEquals(exp, act);
         }
     }
 
@@ -137,7 +151,7 @@ public class FunctionalQueryTest {
      */
     @Test
     public void testSql() throws Exception {
-        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
+        try (Ignite ignored = Ignition.start(Config.getServerConfiguration()); Ignite ignored2 = Ignition.start(Config.getServerConfiguration());
              IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER))
         ) {
             client.query(
@@ -147,19 +161,32 @@ public class FunctionalQueryTest {
                 )).setSchema("PUBLIC")
             ).getAll();
 
-            int key = 1;
-            Person val = new Person(key, "Person 1");
+            final int KEY_COUNT = 10;
+
+            for (int i = 0; i < KEY_COUNT; ++i) {
+                int key = i;
+                Person val = new Person(key, "Person " + i);
 
-            client.query(new SqlFieldsQuery(
-                "INSERT INTO Person(id, name) VALUES(?, ?)"
-            ).setArgs(val.getId(), val.getName()).setSchema("PUBLIC"))
-                .getAll();
+                client.query(new SqlFieldsQuery(
+                    "INSERT INTO Person(id, name) VALUES(?, ?)"
+                ).setArgs(val.getId(), val.getName()).setSchema("PUBLIC"))
+                    .getAll();
+            }
 
             Object cachedName = client.query(
-                new SqlFieldsQuery("SELECT name from Person WHERE id=?").setArgs(key).setSchema("PUBLIC")
+                new SqlFieldsQuery("SELECT name from Person WHERE id=?").setArgs(1).setSchema("PUBLIC")
             ).getAll().iterator().next().iterator().next();
 
-            assertEquals(val.getName(), cachedName);
+            assertEquals("Person 1", cachedName);
+
+            List<List<?>> rows = client.query(
+                new SqlFieldsQuery("SELECT * from Person WHERE id >= ?")
+                    .setSchema("PUBLIC")
+                    .setArgs(0)
+                    .setPageSize(1)
+            ).getAll();
+
+            assertEquals(KEY_COUNT, rows.size());
         }
     }
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
index 51e8297..8e83b57 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
@@ -86,7 +86,6 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT
         int partsFilled = fillAllPartitions(cache, aff);
 
         SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person")
-            .setLazy(true)
             .setPageSize(1);
 
         FieldsQueryCursor<List<?>> qryCursor = cache.query(qry);
@@ -134,7 +133,6 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT
         int partsFilled = fillAllPartitions(cache, aff);
 
         SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person")
-            .setLazy(true)
             .setPageSize(1);
 
         FieldsQueryCursor<List<?>> qryCursor = cache.query(qry);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
index c1034f9..f762416 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.processors.GridProcessor;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -95,98 +96,97 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
     /** */
     @Test
     public void testRemoteQueryExecutionTimeout() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true, true);
     }
 
     /** */
     @Test
     public void testRemoteQueryWithMergeTableTimeout() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true, false);
     }
 
     /** */
     @Test
     public void testRemoteQueryExecutionCancel0() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false, true);
     }
 
     /** */
     @Test
     public void testRemoteQueryExecutionCancel1() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false, true);
     }
 
     /** */
     @Test
     public void testRemoteQueryExecutionCancel2() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false, true);
     }
 
     /** */
     @Test
     public void testRemoteQueryExecutionCancel3() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false, true);
     }
 
     /** */
     @Test
     public void testRemoteQueryWithMergeTableCancel0() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false, false);
     }
 
     /** */
     @Test
     public void testRemoteQueryWithMergeTableCancel1() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false, false);
     }
 
     /** */
     @Test
     public void testRemoteQueryWithMergeTableCancel2() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false, false);
     }
 
     /** */
     @Test
     public void testRemoteQueryWithMergeTableCancel3() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false, false);
     }
 
     /** */
     @Test
     public void testRemoteQueryWithoutMergeTableCancel0() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false, false);
     }
 
     /** */
     @Test
     public void testRemoteQueryWithoutMergeTableCancel1() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false, false);
     }
 
     /** */
     @Test
     public void testRemoteQueryWithoutMergeTableCancel2() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false, false);
     }
 
     /** */
     @Test
     public void testRemoteQueryWithoutMergeTableCancel3() throws Exception {
-        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false, false);
     }
 
     /** */
     @Test
     public void testRemoteQueryAlreadyFinishedStop() throws Exception {
-        testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false);
+        testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false, false);
     }
 
     /** */
     private void testQueryCancel(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit,
-                                 boolean timeout) throws Exception {
+                                 boolean timeout, boolean checkCanceled) throws Exception {
         try (Ignite client = startGrid("client")) {
-
             IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
 
             assertEquals(0, cache.localSize());
@@ -213,7 +213,8 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
                 qry.setTimeout(timeoutUnits, timeUnit);
 
                 cursor = cache.query(qry);
-            } else {
+            }
+            else {
                 cursor = cache.query(qry);
 
                 client.scheduler().runLocal(new Runnable() {
@@ -223,13 +224,16 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
                 }, timeoutUnits, timeUnit);
             }
 
-            try(QueryCursor<List<?>> ignored = cursor) {
-                cursor.iterator();
+            try (QueryCursor<List<?>> ignored = cursor) {
+                cursor.getAll();
+
+                if (checkCanceled)
+                    fail("Query not canceled");
             }
             catch (CacheException ex) {
                 log().error("Got expected exception", ex);
 
-                assertTrue("Must throw correct exception", ex.getCause() instanceof QueryCancelledException);
+                assertNotNull("Must throw correct exception", X.cause(ex, QueryCancelledException.class));
             }
 
             // Give some time to clean up.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
index b3a1a53..f1652c3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
@@ -64,6 +64,11 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
         "where pr.companyId = co._key\n" +
         "order by co._key, pr._key ";
 
+    protected static final String QRY_LONG = "select pe.id, co.id, pr._key\n" +
+        "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
+        "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
+        "order by pe.id desc";
+
     /** */
     protected static final int GRID_CNT = 2;
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
index b33fe24..406eaa5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
 /**
@@ -104,11 +105,11 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
 
         assertEquals(broadcastQry, plan.contains("batched:broadcast"));
 
-        final List<List<?>> pRes = grid(0).cache("pu").query(qry0).getAll();
+        final List<List<?>> goldenRes = grid(0).cache("pu").query(qry0).getAll();
 
         Thread.sleep(3000);
 
-        assertEquals(pRes, grid(0).cache("pu").query(qry0).getAll());
+        assertEquals(goldenRes, grid(0).cache("pu").query(qry0).getAll());
 
         final SqlFieldsQuery qry1;
 
@@ -125,7 +126,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
 
         final List<List<?>> rRes = grid(0).cache("co").query(qry1).getAll();
 
-        assertFalse(pRes.isEmpty());
+        assertFalse(goldenRes.isEmpty());
         assertFalse(rRes.isEmpty());
 
         final AtomicInteger qryCnt = new AtomicInteger();
@@ -164,9 +165,12 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
                             qry.setPageSize(smallPageSize ? 30 : 1000);
 
                             try {
-                                assertEquals(pRes, cache.query(qry).getAll());
+                                assertEquals(goldenRes, cache.query(qry).getAll());
                             }
                             catch (CacheException e) {
+                                if (!smallPageSize)
+                                    log.error("Unexpected exception at the test", e);
+
                                 assertTrue("On large page size must retry.", smallPageSize);
 
                                 boolean failedOnRemoteFetch = false;
@@ -266,7 +270,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
             }
         }, restartThreadsNum, "restart-thread");
 
-        Thread.sleep(duration);
+        GridTestUtils.waitForCondition(() -> fail.get(), duration);
 
         info("Stopping...");
 
@@ -281,4 +285,4 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
 
         info("Stopped.");
     }
-}
+}
\ No newline at end of file
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
index 4de0497..0f24196 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.GridProcessor;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.junit.Test;
 
@@ -42,54 +43,54 @@ public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extend
     /** */
     @Test
     public void testCancel1() throws Exception {
-        testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(grid(0), "pe", QRY_LONG, 1, TimeUnit.MILLISECONDS, false, true);
     }
 
     /** */
     @Test
     public void testCancel2() throws Exception {
-        testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(grid(0), "pe", QRY_LONG, 50, TimeUnit.MILLISECONDS, false, true);
     }
 
     /** */
     @Test
     public void testCancel3() throws Exception {
-        testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(grid(0), "pe", QRY_LONG, 100, TimeUnit.MILLISECONDS, false, false);
     }
 
     /** */
     @Test
     public void testCancel4() throws Exception {
-        testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(grid(0), "pe", QRY_LONG, 500, TimeUnit.MILLISECONDS, false, false);
     }
 
     /** */
     @Test
     public void testTimeout1() throws Exception {
-        testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, true);
+        testQueryCancel(grid(0), "pe", QRY_LONG, 1, TimeUnit.MILLISECONDS, true, true);
     }
 
     /** */
     @Test
     public void testTimeout2() throws Exception {
-        testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, true);
+        testQueryCancel(grid(0), "pe", QRY_LONG, 50, TimeUnit.MILLISECONDS, true, true);
     }
 
     /** */
     @Test
     public void testTimeout3() throws Exception {
-        testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, true);
+        testQueryCancel(grid(0), "pe", QRY_LONG, 100, TimeUnit.MILLISECONDS, true, false);
     }
 
     /** */
     @Test
     public void testTimeout4() throws Exception {
-        testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, true);
+        testQueryCancel(grid(0), "pe", QRY_LONG, 500, TimeUnit.MILLISECONDS, true, false);
     }
 
     /** */
     private void testQueryCancel(Ignite ignite, String cacheName, String sql, int timeoutUnits, TimeUnit timeUnit,
-                           boolean timeout) throws Exception {
+                           boolean timeout, boolean checkCanceled) throws Exception {
         SqlFieldsQuery qry = new SqlFieldsQuery(sql).setDistributedJoins(true);
 
         IgniteCache<Object, Object> cache = ignite.cache(cacheName);
@@ -110,12 +111,15 @@ public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extend
         }
 
         try (QueryCursor<List<?>> ignored = cursor) {
-            cursor.iterator();
+            cursor.getAll();
+
+            if (checkCanceled)
+                fail("Query not canceled");
         }
         catch (CacheException ex) {
             log().error("Got expected exception", ex);
 
-            assertTrue("Must throw correct exception", ex.getCause() instanceof QueryCancelledException);
+            assertNotNull("Must throw correct exception", X.cause(ex, QueryCancelledException.class));
         }
 
         // Give some time to clean up.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
index 5b3ba97..691cf5c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.StampedLock;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -39,6 +40,7 @@ import org.apache.ignite.Ignition;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.QueryRetryException;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -57,6 +59,7 @@ import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDi
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.junit.Test;
@@ -639,6 +642,8 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
      */
     @Test
     public void testQueryConsistencyMultithreaded() throws Exception {
+        final int KEY_COUNT = 5000;
+
         // Start complex topology.
         ignitionStart(serverConfiguration(1));
         ignitionStart(serverConfiguration(2));
@@ -650,7 +655,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
 
         run(cli, createSql);
 
-        put(cli, 0, 5000);
+        put(cli, 0, KEY_COUNT);
 
         final AtomicBoolean stopped = new AtomicBoolean();
 
@@ -698,17 +703,24 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
         IgniteInternalFuture qryFut = multithreadedAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
                 while (!stopped.get()) {
-                    Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
+                    try {
+                        Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5));
 
-                    IgniteCache<BinaryObject, BinaryObject> cache = node.cache(CACHE_NAME).withKeepBinary();
+                        IgniteCache<BinaryObject, BinaryObject> cache = node.cache(CACHE_NAME).withKeepBinary();
 
-                    String valTypeName = ((IgniteEx)node).context().query().types(CACHE_NAME)
-                        .iterator().next().valueTypeName();
+                        String valTypeName = ((IgniteEx)node).context().query().types(CACHE_NAME)
+                            .iterator().next().valueTypeName();
 
-                    List<Cache.Entry<BinaryObject, BinaryObject>> res = cache.query(
-                        new SqlQuery<BinaryObject, BinaryObject>(valTypeName, "from " + TBL_NAME)).getAll();
+                        List<Cache.Entry<BinaryObject, BinaryObject>> res = cache.query(
+                            new SqlQuery<BinaryObject, BinaryObject>(valTypeName, "from " + TBL_NAME)).getAll();
 
-                    assertEquals(5000, res.size());
+                        assertEquals(KEY_COUNT, res.size());
+                    }
+                    catch (Exception e) {
+                        // Swallow retry exceptions.
+                        if (X.cause(e, QueryRetryException.class) == null)
+                            throw e;
+                    }
                 }
 
                 return null;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java
index 1df9e90..4356383 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.QueryRetryException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cluster.ClusterNode;
@@ -45,6 +46,7 @@ import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.lang.IgnitePredicate;
 
 /**
@@ -393,28 +395,35 @@ public abstract class DynamicIndexAbstractSelfTest extends AbstractSchemaSelfTes
      * @param expSize Expected size.
      */
     protected static void assertSqlSimpleData(Ignite node, String sql, int expSize) {
-        SqlQuery qry = new SqlQuery(typeName(ValueClass.class), sql).setArgs(SQL_ARG_1);
+        try {
+            SqlQuery qry = new SqlQuery(typeName(ValueClass.class), sql).setArgs(SQL_ARG_1);
 
-        List<Cache.Entry<BinaryObject, BinaryObject>> res = node.cache(CACHE_NAME).withKeepBinary().query(qry).getAll();
+            List<Cache.Entry<BinaryObject, BinaryObject>> res = node.cache(CACHE_NAME).withKeepBinary().query(qry).getAll();
 
-        Set<Long> ids = new HashSet<>();
+            Set<Long> ids = new HashSet<>();
 
-        for (Cache.Entry<BinaryObject, BinaryObject> entry : res) {
-            long id = entry.getKey().field(FIELD_KEY);
+            for (Cache.Entry<BinaryObject, BinaryObject> entry : res) {
+                long id = entry.getKey().field(FIELD_KEY);
 
-            long field1 = entry.getValue().field(FIELD_NAME_1_ESCAPED);
-            long field2 = entry.getValue().field(FIELD_NAME_2_ESCAPED);
+                long field1 = entry.getValue().field(FIELD_NAME_1_ESCAPED);
+                long field2 = entry.getValue().field(FIELD_NAME_2_ESCAPED);
 
-            assertTrue(field1 >= SQL_ARG_1);
+                assertTrue(field1 >= SQL_ARG_1);
 
-            assertEquals(id, field1);
-            assertEquals(id, field2);
+                assertEquals(id, field1);
+                assertEquals(id, field2);
 
-            assertTrue(ids.add(id));
-        }
+                assertTrue(ids.add(id));
+            }
 
-        assertEquals("Size mismatch [node=" + node.name() + ", exp=" + expSize + ", actual=" + res.size() +
-            ", ids=" + ids + ']', expSize, res.size());
+            assertEquals("Size mismatch [node=" + node.name() + ", exp=" + expSize + ", actual=" + res.size() +
+                ", ids=" + ids + ']', expSize, res.size());
+        }
+        catch (Exception e) {
+            // Swallow QueryRetryException.
+            if (X.cause(e, QueryRetryException.class) == null)
+                throw e;
+        }
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
index d7f0470..f7c3171 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
@@ -191,7 +191,7 @@ public class H2ConnectionLeaksSelfTest extends AbstractIndexingCommonTest {
                 Map<Thread, ?> conns = perThreadConnections(i);
 
                 for(Thread t : conns.keySet())
-                    log.error("+++ Connection is not closed for thread: " + t.getName());
+                    log.error("Connection is not closed for thread: " + t.getName());
             }
 
             fail("H2 JDBC connections leak detected. See the log above.");
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java
index 5c5c0eb..4631087 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/local/IgniteCacheLocalQueryCancelOrTimeoutSelfTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -147,7 +148,7 @@ public class IgniteCacheLocalQueryCancelOrTimeoutSelfTest extends GridCommonAbst
             fail("Expecting timeout");
         }
         catch (Exception e) {
-            assertTrue("Must throw correct exception", e.getCause() instanceof QueryCancelledException);
+            assertNotNull("Must throw correct exception", X.cause(e, QueryCancelledException.class));
         }
 
         // Test must exit gracefully.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/AbstractQueryTableLockAndConnectionPoolSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/AbstractQueryTableLockAndConnectionPoolSelfTest.java
new file mode 100644
index 0000000..36bdae3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/AbstractQueryTableLockAndConnectionPoolSelfTest.java
@@ -0,0 +1,854 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryRetryException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests for query execution check cases for correct table lock/unlock.
+ */
+public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends AbstractIndexingCommonTest {
+    /** Keys count. */
+    private static final int KEY_CNT = 500;
+
+    /** Base query argument. */
+    private static final int BASE_QRY_ARG = 50;
+
+    /** Size for small pages. */
+    private static final int PAGE_SIZE_SMALL = 12;
+
+    /** Test duration. */
+    private static final long TEST_DUR = 10_000L;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Test local query execution.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSingleNode() throws Exception {
+        checkSingleNode(1);
+    }
+
+    /**
+     * Test local query execution.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSingleNodeWithParallelism() throws Exception {
+        checkSingleNode(4);
+    }
+
+    /**
+     * Test query execution with multiple topology nodes.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMultipleNodes() throws Exception {
+        checkMultipleNodes(1);
+    }
+
+    /**
+     * Test query execution with multiple topology nodes with query parallelism.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMultipleNodesWithParallelism() throws Exception {
+        checkMultipleNodes(4);
+    }
+
+    /**
+     * Test DDL operation on table with high load queries.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSingleNodeTablesLockQueryAndDDLMultithreaded() throws Exception {
+        final Ignite srv = startGrid(0);
+
+        populateBaseQueryData(srv, 1);
+
+        checkTablesLockQueryAndDDLMultithreaded(srv);
+
+        checkTablesLockQueryAndDropColumnMultithreaded(srv);
+    }
+
+    /**
+     * Test DDL operation on table with high load queries.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSingleNodeWithParallelismTablesLockQueryAndDDLMultithreaded() throws Exception {
+        final Ignite srv = startGrid(0);
+
+        populateBaseQueryData(srv, 4);
+
+        checkTablesLockQueryAndDDLMultithreaded(srv);
+
+        checkTablesLockQueryAndDropColumnMultithreaded(srv);
+    }
+
+    /**
+     * Test DDL operation on table with high load queries.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMultipleNodesWithTablesLockQueryAndDDLMultithreaded() throws Exception {
+        Ignite srv0 = startGrid(0);
+        Ignite srv1 = startGrid(1);
+        startGrid(2);
+
+        Ignite cli;
+
+        try {
+            Ignition.setClientMode(true);
+
+            cli = startGrid(3);
+        }
+        finally {
+            Ignition.setClientMode(false);
+        }
+
+        populateBaseQueryData(srv0, 1);
+
+        checkTablesLockQueryAndDDLMultithreaded(srv0);
+        checkTablesLockQueryAndDDLMultithreaded(srv1);
+        checkTablesLockQueryAndDDLMultithreaded(cli);
+
+        checkTablesLockQueryAndDropColumnMultithreaded(srv0);
+        checkTablesLockQueryAndDropColumnMultithreaded(srv1);
+        // TODO: +++ DDL DROP COLUMN CacheContext == null on CLI
+        // checkTablesLockQueryAndDropColumnMultithreaded(cli);
+    }
+
+    /**
+     * Test DDL operation on table with high load queries.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMultipleNodesWithParallelismTablesLockQueryAndDDLMultithreaded() throws Exception {
+        Ignite srv0 = startGrid(0);
+        Ignite srv1 = startGrid(1);
+        startGrid(2);
+
+        Ignite cli;
+
+        try {
+            Ignition.setClientMode(true);
+
+            cli = startGrid(3);
+        }
+        finally {
+            Ignition.setClientMode(false);
+        }
+
+        populateBaseQueryData(srv0, 4);
+
+        checkTablesLockQueryAndDDLMultithreaded(srv0);
+        checkTablesLockQueryAndDDLMultithreaded(srv1);
+        checkTablesLockQueryAndDDLMultithreaded(cli);
+
+        checkTablesLockQueryAndDropColumnMultithreaded(srv0);
+        checkTablesLockQueryAndDropColumnMultithreaded(srv1);
+        // TODO: +++ DDL DROP COLUMN CacheContext == null on CLI
+        // checkTablesLockQueryAndDropColumnMultithreaded(cli);
+    }
+
+    /**
+     * Test release reserved partition after query complete (results is bigger than one page).
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReleasePartitionReservationSeveralPagesResults() throws Exception {
+        checkReleasePartitionReservation(PAGE_SIZE_SMALL);
+    }
+
+    /**
+     * Test release reserved partition after query complete (results is placed on one page).
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReleasePartitionReservationOnePageResults() throws Exception {
+        checkReleasePartitionReservation(KEY_CNT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testFetchFromRemovedTable() throws Exception {
+        Ignite srv = startGrid(0);
+
+        execute(srv, "CREATE TABLE TEST (id int primary key, val int)");
+
+        for (int i = 0; i < 10; ++i)
+            execute(srv, "INSERT INTO TEST VALUES (" + i + ", " + i + ")");
+
+        FieldsQueryCursor<List<?>> cur = execute(srv, new SqlFieldsQuery("SELECT * from TEST").setPageSize(1));
+
+        Iterator<List<?>> it = cur.iterator();
+
+        it.next();
+
+        execute(srv, "DROP TABLE TEST");
+
+        try {
+            while (it.hasNext())
+                it.next();
+
+            if (lazy())
+                fail("Retry exception must be thrown");
+        }
+        catch (Exception e) {
+            if (!lazy()) {
+                log.error("In lazy=false mode the query must be finished successfully", e);
+
+                fail("In lazy=false mode the query must be finished successfully");
+            }
+            else
+                assertNotNull(X.cause(e, QueryRetryException.class));
+        }
+    }
+
+    /**
+     * @param node Ignite node to execute query.
+     * @throws Exception If failed.
+     */
+    private void checkTablesLockQueryAndDDLMultithreaded(final Ignite node) throws Exception {
+        final AtomicBoolean end = new AtomicBoolean(false);
+
+        final int qryThreads = 10;
+
+        // Do many concurrent queries.
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                while(!end.get()) {
+                    try {
+                        FieldsQueryCursor<List<?>> cursor = execute(node, new SqlFieldsQuery(
+                            "SELECT pers.id, pers.name " +
+                            "FROM (SELECT DISTINCT p.id, p.name " +
+                            "FROM \"pers\".PERSON as p) as pers " +
+                            "JOIN \"pers\".PERSON p on p.id = pers.id " +
+                            "JOIN (SELECT t.persId as persId, SUM(t.time) totalTime " +
+                            "FROM \"persTask\".PersonTask as t GROUP BY t.persId) as task ON task.persId = pers.id")
+                            .setLazy(lazy())
+                            .setPageSize(PAGE_SIZE_SMALL));
+
+                        cursor.getAll();
+                    }
+                    catch (Exception e) {
+                        if(X.cause(e, QueryRetryException.class) == null) {
+                            log.error("Unexpected exception", e);
+
+                            fail("Unexpected exception. " + e);
+                        }
+                        else if (!lazy()) {
+                            log.error("Unexpected exception", e);
+
+                            fail("Unexpected QueryRetryException.");
+                        }
+                    }
+                }
+            }
+        }, qryThreads, "usr-qry");
+
+        long tEnd = U.currentTimeMillis() + TEST_DUR;
+
+        while (U.currentTimeMillis() < tEnd) {
+            execute(node, new SqlFieldsQuery("CREATE INDEX \"pers\".PERSON_NAME ON \"pers\".Person (name asc)")).getAll();
+            execute(node, new SqlFieldsQuery("DROP INDEX \"pers\".PERSON_NAME")).getAll();
+        }
+
+        // Test is OK in case DDL operations is passed on hi load queries pressure.
+        end.set(true);
+        fut.get();
+    }
+
+    /**
+     * @param node Ignite node to execute query.
+     * @throws Exception If failed.
+     */
+    private void checkTablesLockQueryAndDropColumnMultithreaded(final Ignite node) throws Exception {
+        final AtomicBoolean end = new AtomicBoolean(false);
+
+        final int qryThreads = 10;
+
+        // Do many concurrent queries.
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                while(!end.get()) {
+                    try {
+                        FieldsQueryCursor<List<?>> cursor = execute(node, new SqlFieldsQuery(
+                            "SELECT pers.id, pers.name FROM \"pers\".PERSON")
+                            .setLazy(lazy())
+                            .setPageSize(PAGE_SIZE_SMALL));
+
+                        cursor.getAll();
+                    }
+                    catch (Exception e) {
+                        if (e.getMessage().contains("Failed to parse query. Column \"PERS.ID\" not found")) {
+                            // Swallow exception when column is dropped.
+                        }
+                        else if(X.cause(e, QueryRetryException.class) == null) {
+                            log.error("Unexpected exception", e);
+
+                            fail("Unexpected exception. " + e);
+                        }
+                        else if (!lazy()) {
+                            log.error("Unexpected exception", e);
+
+                            fail("Unexpected QueryRetryException.");
+                        }
+                    }
+                }
+            }
+        }, qryThreads, "usr-qry");
+
+
+        long tEnd = U.currentTimeMillis() + TEST_DUR;
+
+        while (U.currentTimeMillis() < tEnd) {
+            execute(node, new SqlFieldsQuery("ALTER TABLE \"pers\".Person DROP COLUMN name")).getAll();
+            execute(node, new SqlFieldsQuery("ALTER TABLE \"pers\".Person ADD  COLUMN name varchar")).getAll();
+        }
+
+        // Test is OK in case DDL operations is passed on hi load queries pressure.
+        end.set(true);
+        fut.get();
+    }
+
+    /**
+     * Test release reserved partition after query complete.
+     * In case partitions not released the `awaitPartitionMapExchange` fails by timeout.
+     *
+     * @param pageSize Results page size.
+     * @throws Exception If failed.
+     */
+    public void checkReleasePartitionReservation(int pageSize) throws Exception {
+        Ignite srv1 = startGrid(1);
+        startGrid(2);
+
+        populateBaseQueryData(srv1, 1);
+
+        FieldsQueryCursor<List<?>> cursor = execute(srv1, query(0).setPageSize(pageSize));
+
+        cursor.getAll();
+
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * Check local query execution.
+     *
+     * @param parallelism Query parallelism.
+     * @throws Exception If failed.
+     */
+    public void checkSingleNode(int parallelism) throws Exception {
+        Ignite srv = startGrid();
+
+        populateBaseQueryData(srv, parallelism);
+
+        checkBaseOperations(srv);
+    }
+
+    /**
+     * Check query execution with multiple topology nodes.
+     *
+     * @param parallelism Query parallelism.
+     * @throws Exception If failed.
+     */
+    public void checkMultipleNodes(int parallelism) throws Exception {
+        Ignite srv1 = startGrid(1);
+        Ignite srv2 = startGrid(2);
+
+        Ignite cli;
+
+        try {
+            Ignition.setClientMode(true);
+
+            cli = startGrid(3);
+        }
+        finally {
+            Ignition.setClientMode(false);
+        }
+
+        populateBaseQueryData(cli, parallelism);
+
+        checkBaseOperations(srv1);
+        checkBaseOperations(srv2);
+        checkBaseOperations(cli);
+
+        // Test originating node leave.
+        FieldsQueryCursor<List<?>> cursor = execute(cli, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+        Iterator<List<?>> iter = cursor.iterator();
+
+        for (int i = 0; i < 30; i++)
+            iter.next();
+
+        stopGrid(3);
+
+        // Test server node leave with active worker.
+        FieldsQueryCursor<List<?>> cursor2 = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+        try {
+            Iterator<List<?>> iter2 = cursor2.iterator();
+
+            for (int i = 0; i < 30; i++)
+                iter2.next();
+
+            stopGrid(2);
+        }
+        finally {
+            cursor2.close();
+        }
+    }
+
+    /**
+     * Check base operations.
+     *
+     * @param node Node.
+     * @throws Exception If failed.
+     */
+    private void checkBaseOperations(Ignite node) throws Exception {
+        checkQuerySplitToSeveralMapQueries(node);
+
+        // Get full data.
+        {
+            List<List<?>> rows = execute(node, baseQuery()).getAll();
+
+            assertBaseQueryResults(rows);
+        }
+
+        // Check QueryRetryException is thrown
+        {
+            List<List<?>> rows = new ArrayList<>();
+
+            FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+            Iterator<List<?>> it = cursor.iterator();
+
+            for (int i = 0; i < 10; ++i)
+                rows.add(it.next());
+
+            execute(node, new SqlFieldsQuery("CREATE INDEX \"pers\".PERSON_NAME ON \"pers\".Person (name asc)")).getAll();
+            execute(node, new SqlFieldsQuery("DROP INDEX \"pers\".PERSON_NAME")).getAll();
+
+            try {
+                while (it.hasNext())
+                    rows.add(it.next());
+
+                if (lazy())
+                    fail("Retry exception must be thrown");
+            }
+            catch (Exception e) {
+                if (!lazy() || X.cause(e, QueryRetryException.class) == null) {
+                    log.error("Invalid exception: ", e);
+
+                    fail("QueryRetryException is expected");
+                }
+            }
+        }
+
+        // Get data in several pages.
+        {
+            List<List<?>> rows = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)).getAll();
+
+            assertBaseQueryResults(rows);
+        }
+
+
+        // Test full iteration.
+        {
+            List<List<?>> rows = new ArrayList<>();
+
+            FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+            for (List<?> row : cursor)
+                rows.add(row);
+
+            cursor.close();
+
+            assertBaseQueryResults(rows);
+        }
+
+        // Test partial iteration with cursor close.
+        try (FieldsQueryCursor<List<?>> partialCursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL))) {
+            Iterator<List<?>> iter = partialCursor.iterator();
+
+            for (int i = 0; i < 30; i++)
+                iter.next();
+        }
+
+
+        // Test execution of multiple queries at a time.
+        List<Iterator<List<?>>> iters = new ArrayList<>();
+
+        for (int i = 0; i < 200; i++)
+            iters.add(execute(node, randomizedQuery().setPageSize(PAGE_SIZE_SMALL)).iterator());
+
+        while (!iters.isEmpty()) {
+            Iterator<Iterator<List<?>>> iterIter = iters.iterator();
+
+            while (iterIter.hasNext()) {
+                Iterator<List<?>> iter = iterIter.next();
+
+                int i = 0;
+
+                while (iter.hasNext() && i < 20) {
+                    iter.next();
+
+                    i++;
+                }
+
+                if (!iter.hasNext())
+                    iterIter.remove();
+            }
+        }
+
+        checkHoldQuery(node);
+
+        checkShortQuery(node);
+    }
+
+    /**
+     * @param node Ignite node.
+     * @throws Exception If failed.
+     */
+    public void checkHoldQuery(Ignite node) throws Exception {
+        ArrayList rows = new ArrayList<>();
+
+        Iterator<List<?>> it0 = execute(node, query(BASE_QRY_ARG).setPageSize(PAGE_SIZE_SMALL)).iterator();
+        rows.add(it0.next());
+
+        // Do many concurrent queries to Test full iteration.
+        GridTestUtils.runMultiThreaded(new Runnable() {
+            @Override public void run() {
+                for (int i = 0; i < 5; ++i) {
+                    FieldsQueryCursor<List<?>> cursor = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1)
+                        .setPageSize(PAGE_SIZE_SMALL));
+
+                    cursor.getAll();
+                }
+            }
+        }, 5, "test-qry");
+
+        // Do the same query in the same thread.
+        {
+            FieldsQueryCursor<List<?>> cursor = execute(node, query(BASE_QRY_ARG)
+                .setPageSize(PAGE_SIZE_SMALL));
+
+            cursor.getAll();
+        }
+
+        while (it0.hasNext())
+            rows.add(it0.next());
+
+        assertBaseQueryResults(rows);
+    }
+
+    /**
+     * @param node Ignite node.
+     * @throws Exception If failed.
+     */
+    public void checkShortQuery(Ignite node) throws Exception {
+        ArrayList rows = new ArrayList<>();
+
+        FieldsQueryCursor<List<?>> cursor0 = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1).setPageSize(PAGE_SIZE_SMALL));
+
+        Iterator<List<?>> it = cursor0.iterator();
+
+        while (it.hasNext())
+            rows.add(it.next());
+
+        assertQueryResults(rows, KEY_CNT - PAGE_SIZE_SMALL + 1);
+    }
+
+    /**
+     * @param node Ignite node.
+     * @throws Exception If failed.
+     */
+    public void checkQuerySplitToSeveralMapQueries(Ignite node) throws Exception {
+        ArrayList rows = new ArrayList<>();
+
+        FieldsQueryCursor<List<?>> cursor0 = execute(node, new SqlFieldsQuery(
+            "SELECT pers.id, pers.name " +
+            "FROM (SELECT DISTINCT p.id, p.name " +
+                "FROM \"pers\".PERSON as p) as pers " +
+            "JOIN \"pers\".PERSON p on p.id = pers.id " +
+            "JOIN (SELECT t.persId as persId, SUM(t.time) totalTime " +
+                "FROM \"persTask\".PersonTask as t GROUP BY t.persId) as task ON task.persId = pers.id")
+            .setPageSize(PAGE_SIZE_SMALL));
+
+        Iterator<List<?>> it = cursor0.iterator();
+
+        while (it.hasNext())
+            rows.add(it.next());
+
+        assertQueryResults(rows, 0);
+    }
+
+    /**
+     * Populate base query data.
+     *
+     * @param node Node.
+     * @param parallelism Query parallelism.
+     */
+    private static void populateBaseQueryData(Ignite node, int parallelism) {
+        node.createCache(cacheConfiguration(parallelism, "pers", Person.class));
+        node.createCache(cacheConfiguration(parallelism, "persTask", PersonTask.class));
+
+        IgniteCache<Long, Object> pers = cache(node, "pers");
+
+        for (long i = 0; i < KEY_CNT; i++)
+            pers.put(i, new Person(i));
+
+        IgniteCache<Long, Object> comp = cache(node, "persTask");
+
+        for (long i = 0; i < KEY_CNT; i++)
+            comp.put(i, new PersonTask(i));
+    }
+
+    /**
+     * @return Query with randomized argument.
+     */
+    private static SqlFieldsQuery randomizedQuery() {
+        return query(ThreadLocalRandom.current().nextInt(KEY_CNT / 2));
+    }
+
+    /**
+     * @return Base query.
+     */
+    private static SqlFieldsQuery baseQuery() {
+        return query(BASE_QRY_ARG);
+    }
+
+    /**
+     * @param parallelism Query parallelism.
+     * @param name Cache name.
+     * @param valClass Value class.
+     * @return Default cache configuration.
+     */
+    private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism, String name, Class valClass) {
+        return new CacheConfiguration<Long, Person>()
+            .setName(name)
+            .setIndexedTypes(Long.class, valClass)
+            .setQueryParallelism(parallelism)
+            .setAffinity(new RendezvousAffinityFunction(false, 10));
+    }
+
+    /**
+     * Default query.
+     *
+     * @param arg Argument.
+     * @return Query.
+     */
+    private static SqlFieldsQuery query(long arg) {
+        return new SqlFieldsQuery(
+            "SELECT id, name FROM \"pers\".Person WHERE id >= " + arg);
+    }
+
+    /**
+     * Assert base query results.
+     *
+     * @param rows Result rows.
+     */
+    private static void assertBaseQueryResults(List<List<?>> rows) {
+        assertQueryResults(rows, BASE_QRY_ARG);
+    }
+
+    /**
+     * Assert base query results.
+     *
+     * @param rows Result rows.
+     * @param resSize Result size.
+     */
+    private static void assertQueryResults(List<List<?>> rows, int resSize) {
+        assertEquals(KEY_CNT - resSize, rows.size());
+
+        for (List<?> row : rows) {
+            Long id = (Long)row.get(0);
+            String name = (String)row.get(1);
+
+            assertTrue(id >= resSize);
+            assertEquals(nameForId(id), name);
+        }
+    }
+
+    /**
+     * Get cache for node.
+     *
+     * @param node Ignite node to get cache.
+     * @param name Cache name.
+     * @return Cache.
+     */
+    private static IgniteCache<Long, Object> cache(Ignite node, String name) {
+        return node.cache(name);
+    }
+
+    /**
+     * Execute query on the given cache.
+     *
+     * @param node Node.
+     * @param sql Query.
+     * @return Cursor.
+     */
+    private FieldsQueryCursor<List<?>> execute(Ignite node, String sql) {
+        return ((IgniteEx)node).context().query().querySqlFields(new SqlFieldsQuery(sql).setLazy(lazy()), false);
+    }
+
+    /**
+     * Execute query on the given cache.
+     *
+     * @param node Node.
+     * @param qry Query.
+     * @return Cursor.
+     */
+    private FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) {
+        return ((IgniteEx)node).context().query().querySqlFields(qry.setLazy(lazy()), false);
+    }
+
+
+    /**
+     * @return Lazy mode.
+     */
+    protected abstract boolean lazy();
+
+    /**
+     * Get name for ID.
+     *
+     * @param id ID.
+     * @return Name.
+     */
+    private static String nameForId(long id) {
+        return "name-" + id;
+    }
+
+    /**
+     * Person class.
+     */
+    private static class Person {
+        /** ID. */
+        @QuerySqlField(index = true)
+        private long id;
+
+        /** Name. */
+        @QuerySqlField
+        private String name;
+
+        /**
+         * Constructor.
+         *
+         * @param id ID.
+         */
+        public Person(long id) {
+            this.id = id;
+            this.name = nameForId(id);
+        }
+
+        /**
+         * @return ID.
+         */
+        public long id() {
+            return id;
+        }
+
+        /**
+         * @return Name.
+         */
+        public String name() {
+            return name;
+        }
+    }
+
+    /**
+     * Company class.
+     */
+    private static class PersonTask {
+        /** ID. */
+        @QuerySqlField(index = true)
+        private long id;
+
+        @QuerySqlField(index = true)
+        private long persId;
+
+        /** Name. */
+        @QuerySqlField
+        private long time;
+
+        /**
+         * Constructor.
+         *
+         * @param id ID.
+         */
+        public PersonTask(long id) {
+            this.id = id;
+            this.persId = id;
+            this.time = id;
+        }
+
+        /**
+         * @return ID.
+         */
+        public long id() {
+            return id;
+        }
+
+        /**
+         * @return Name.
+         */
+        public long time() {
+            return time;
+        }
+    }
+}
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOffTest.java
similarity index 71%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOffTest.java
index 0a6181e..eabb076 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOffTest.java
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.testsuites;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+package org.apache.ignite.internal.processors.query;
 
 /**
- * Special test suite with ignored tests for Binary mode.
+ * Tests for query execution with disabled lazy mode.
  */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+public class IgniteQueryTableLockAndConnectionPoolLazyModeOffTest
+    extends AbstractQueryTableLockAndConnectionPoolSelfTest {
+    /** {@inheritDoc} */
+    @Override protected boolean lazy() {
+        return false;
+    }
 }
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOnTest.java
similarity index 72%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOnTest.java
index 0a6181e..1027880 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteQueryTableLockAndConnectionPoolLazyModeOnTest.java
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.testsuites;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+package org.apache.ignite.internal.processors.query;
 
 /**
- * Special test suite with ignored tests for Binary mode.
+ * Tests for lazy query execution.
  */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+public class IgniteQueryTableLockAndConnectionPoolLazyModeOnTest
+    extends AbstractQueryTableLockAndConnectionPoolSelfTest {
+    /** {@inheritDoc} */
+    @Override protected boolean lazy() {
+        return true;
+    }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
deleted file mode 100644
index 9128a55..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.cache.query.annotations.QuerySqlField;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.junit.Test;
-
-/**
- * Tests for lazy query execution.
- */
-public class LazyQuerySelfTest extends AbstractIndexingCommonTest {
-    /** Keys ocunt. */
-    private static final int KEY_CNT = 200;
-
-    /** Base query argument. */
-    private static final int BASE_QRY_ARG = 50;
-
-    /** Size for small pages. */
-    private static final int PAGE_SIZE_SMALL = 12;
-
-    /** Cache name. */
-    private static final String CACHE_NAME = "cache";
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     * Test local query execution.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testSingleNode() throws Exception {
-        checkSingleNode(1);
-    }
-
-    /**
-     * Test local query execution.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testSingleNodeWithParallelism() throws Exception {
-        checkSingleNode(4);
-    }
-
-    /**
-     * Test query execution with multiple topology nodes.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testMultipleNodes() throws Exception {
-        checkMultipleNodes(1);
-    }
-
-    /**
-     * Test query execution with multiple topology nodes with query parallelism.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testMultipleNodesWithParallelism() throws Exception {
-        checkMultipleNodes(4);
-    }
-
-    /**
-     * Check local query execution.
-     *
-     * @param parallelism Query parallelism.
-     * @throws Exception If failed.
-     */
-    public void checkSingleNode(int parallelism) throws Exception {
-        Ignite srv = startGrid();
-
-        srv.createCache(cacheConfiguration(parallelism));
-
-        populateBaseQueryData(srv);
-
-        checkBaseOperations(srv);
-    }
-
-    /**
-     * Check query execution with multiple topology nodes.
-     *
-     * @param parallelism Query parallelism.
-     * @throws Exception If failed.
-     */
-    public void checkMultipleNodes(int parallelism) throws Exception {
-        Ignite srv1 = startGrid(1);
-        Ignite srv2 = startGrid(2);
-
-        Ignite cli;
-
-        try {
-            Ignition.setClientMode(true);
-
-            cli = startGrid(3);
-        }
-        finally {
-            Ignition.setClientMode(false);
-        }
-
-        cli.createCache(cacheConfiguration(parallelism));
-
-        populateBaseQueryData(cli);
-
-        checkBaseOperations(srv1);
-        checkBaseOperations(srv2);
-        checkBaseOperations(cli);
-
-        // Test originating node leave.
-        FieldsQueryCursor<List<?>> cursor = execute(cli, baseQuery().setPageSize(PAGE_SIZE_SMALL));
-
-        Iterator<List<?>> iter = cursor.iterator();
-
-        for (int i = 0; i < 30; i++)
-            iter.next();
-
-        stopGrid(3);
-
-        assertNoWorkers();
-
-        // Test server node leave with active worker.
-        cursor = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
-
-        try {
-            iter = cursor.iterator();
-
-            for (int i = 0; i < 30; i++)
-                iter.next();
-
-            stopGrid(2);
-        }
-        finally {
-            cursor.close();
-        }
-
-        assertNoWorkers();
-    }
-
-    /**
-     * Check base operations.
-     *
-     * @param node Node.
-     * @throws Exception If failed.
-     */
-    private void checkBaseOperations(Ignite node) throws Exception {
-        // Get full data.
-        List<List<?>> rows = execute(node, baseQuery()).getAll();
-
-        assertBaseQueryResults(rows);
-        assertNoWorkers();
-
-        // Get data in several pages.
-        rows = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)).getAll();
-
-        assertBaseQueryResults(rows);
-        assertNoWorkers();
-
-        // Test full iteration.
-        rows = new ArrayList<>();
-
-        FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL));
-
-        for (List<?> row : cursor)
-            rows.add(row);
-
-        assertBaseQueryResults(rows);
-        assertNoWorkers();
-
-        // Test partial iteration with cursor close.
-        try (FieldsQueryCursor<List<?>> partialCursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL))) {
-            Iterator<List<?>> iter = partialCursor.iterator();
-
-            for (int i = 0; i < 30; i++)
-                iter.next();
-        }
-
-        assertNoWorkers();
-
-        // Test execution of multiple queries at a time.
-        List<Iterator<List<?>>> iters = new ArrayList<>();
-
-        for (int i = 0; i < 200; i++)
-            iters.add(execute(node, randomizedQuery().setPageSize(PAGE_SIZE_SMALL)).iterator());
-
-        while (!iters.isEmpty()) {
-            Iterator<Iterator<List<?>>> iterIter = iters.iterator();
-
-            while (iterIter.hasNext()) {
-                Iterator<List<?>> iter = iterIter.next();
-
-                int i = 0;
-
-                while (iter.hasNext() && i < 20) {
-                    iter.next();
-
-                    i++;
-                }
-
-                if (!iter.hasNext())
-                    iterIter.remove();
-            }
-        }
-
-        assertNoWorkers();
-    }
-
-    /**
-     * Populate base query data.
-     *
-     * @param node Node.
-     */
-    private static void populateBaseQueryData(Ignite node) {
-        IgniteCache<Long, Person> cache = cache(node);
-
-        for (long i = 0; i < KEY_CNT; i++)
-            cache.put(i, new Person(i));
-    }
-
-    /**
-     * @return Query with randomized argument.
-     */
-    private static SqlFieldsQuery randomizedQuery() {
-        return query(ThreadLocalRandom.current().nextInt(KEY_CNT / 2));
-    }
-
-    /**
-     * @return Base query.
-     */
-    private static SqlFieldsQuery baseQuery() {
-        return query(BASE_QRY_ARG);
-    }
-
-    /**
-     * @param parallelism Query parallelism.
-     * @return Default cache configuration.
-     */
-    private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism) {
-        return new CacheConfiguration<Long, Person>().setName(CACHE_NAME).setIndexedTypes(Long.class, Person.class)
-            .setQueryParallelism(parallelism);
-    }
-
-    /**
-     * Default query.
-     *
-     * @param arg Argument.
-     * @return Query.
-     */
-    private static SqlFieldsQuery query(long arg) {
-        return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= ?").setArgs(arg);
-    }
-
-    /**
-     * Assert base query results.
-     *
-     * @param rows Result rows.
-     */
-    private static void assertBaseQueryResults(List<List<?>> rows) {
-        assertEquals(KEY_CNT - BASE_QRY_ARG, rows.size());
-
-        for (List<?> row : rows) {
-            Long id = (Long)row.get(0);
-            String name = (String)row.get(1);
-
-            assertTrue(id >= BASE_QRY_ARG);
-            assertEquals(nameForId(id), name);
-        }
-    }
-
-    /**
-     * Get cache for node.
-     *
-     * @param node Node.
-     * @return Cache.
-     */
-    private static IgniteCache<Long, Person> cache(Ignite node) {
-        return node.cache(CACHE_NAME);
-    }
-
-    /**
-     * Execute query on the given cache.
-     *
-     * @param node Node.
-     * @param qry Query.
-     * @return Cursor.
-     */
-    private static FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) {
-        return cache(node).query(qry.setLazy(true));
-    }
-
-    /**
-     * Make sure that are no active lazy workers.
-     *
-     * @throws Exception If failed.
-     */
-    private static void assertNoWorkers() throws Exception {
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                for (Ignite node : Ignition.allGrids()) {
-                    IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing();
-
-                    if (idx.mapQueryExecutor().registeredLazyWorkers() != 0)
-                        return false;
-                }
-
-                return MapQueryLazyWorker.activeCount() == 0;
-            }
-        }, 1000L);
-    }
-
-    /**
-     * Get name for ID.
-     *
-     * @param id ID.
-     * @return Name.
-     */
-    private static String nameForId(long id) {
-        return "name-" + id;
-    }
-
-    /**
-     * Person class.
-     */
-    private static class Person {
-        /** ID. */
-        @QuerySqlField(index = true)
-        private long id;
-
-        /** Name. */
-        @QuerySqlField
-        private String name;
-
-        /**
-         * Constructor.
-         *
-         * @param id ID.
-         */
-        public Person(long id) {
-            this.id = id;
-            this.name = nameForId(id);
-        }
-
-        /**
-         * @return ID.
-         */
-        public long id() {
-            return id;
-        }
-
-        /**
-         * @return Name.
-         */
-        public String name() {
-            return name;
-        }
-    }
-}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
index 8e50bf8..18ea78d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
@@ -366,7 +366,7 @@ public class QueryDataPageScanTest extends GridCommonAbstractTest {
                     .setLazy(true)
                     .setDataPageScanEnabled(DirectPageScanIndexing.expectedDataPageScanEnabled)
                     .setArgs(1, expNestedLoops, DirectPageScanIndexing.expectedDataPageScanEnabled)
-                    .setPageSize(keysCnt / 2) // Must be less than keysCnt.
+                    .setPageSize(keysCnt / 10) // Must be less than keysCnt.
             )
         ) {
             int nestedLoops = 0;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java
index 5ba8e68..4e276b2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java
@@ -44,18 +44,22 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTest
 public class DisappearedCacheCauseRetryMessageSelfTest extends AbstractIndexingCommonTest {
     /** */
     private static final int NODES_COUNT = 2;
+
     /** */
     private static final String ORG = "org";
+
     /** */
     private IgniteCache<String, JoinSqlTestHelper.Person> personCache;
+
     /** */
     private IgniteCache<String, JoinSqlTestHelper.Organization> orgCache;
 
     /** */
     @Test
     public void testDisappearedCacheCauseRetryMessage() {
-
-        SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0");
+        SqlQuery<String, JoinSqlTestHelper.Person> qry =
+            new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL)
+                .setArgs("Organization #0");
 
         qry.setDistributedJoins(true);
 
@@ -65,6 +69,9 @@ public class DisappearedCacheCauseRetryMessageSelfTest extends AbstractIndexingC
             fail("No CacheException emitted.");
         }
         catch (CacheException e) {
+            if (!e.getMessage().contains("Failed to reserve partitions for query (cache is not found on local node) ["))
+                e.printStackTrace();
+
             assertTrue(e.getMessage(), e.getMessage().contains("Failed to reserve partitions for query (cache is not found on local node) ["));
         }
     }
@@ -92,11 +99,11 @@ public class DisappearedCacheCauseRetryMessageSelfTest extends AbstractIndexingC
                         GridQueryCancelRequest req = (GridQueryCancelRequest) (gridMsg.message());
 
                         if (reqId == req.queryRequestId())
-                            orgCache = DisappearedCacheCauseRetryMessageSelfTest.this.ignite(0).getOrCreateCache(new CacheConfiguration<String, Organization>(ORG)
+                            orgCache = DisappearedCacheCauseRetryMessageSelfTest.this.ignite(0)
+                                .getOrCreateCache(new CacheConfiguration<String, Organization>(ORG)
                                 .setCacheMode(CacheMode.REPLICATED)
                                 .setQueryEntities(JoinSqlTestHelper.organizationQueryEntity())
                             );
-
                     }
                 }
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
index 65c37b0..f09cc5b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
@@ -402,31 +402,5 @@ public class RetryCauseMessageSelfTest extends AbstractIndexingCommonTest {
         @Override public void onMessage(UUID nodeId, Object msg) {
             startedExecutor.onMessage(nodeId, msg);
         }
-
-        /** {@inheritDoc} */
-        @Override public void cancelLazyWorkers() {
-            startedExecutor.cancelLazyWorkers();
-        }
-
-        /** {@inheritDoc} */
-        @Override GridSpinBusyLock busyLock() {
-            return startedExecutor.busyLock();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void stopAndUnregisterCurrentLazyWorker() {
-            startedExecutor.stopAndUnregisterCurrentLazyWorker();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void unregisterLazyWorker(MapQueryLazyWorker worker) {
-            startedExecutor.unregisterLazyWorker(worker);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int registeredLazyWorkers() {
-            return startedExecutor.registeredLazyWorkers();
-        }
     }
-
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/AbstractQueryOOMTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/AbstractQueryOOMTest.java
new file mode 100644
index 0000000..1d9b4a7
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/AbstractQueryOOMTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.oom;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for OOME on query.
+ */
+@RunWith(JUnit4.class)
+public abstract class AbstractQueryOOMTest extends GridCommonAbstractTest {
+    /** */
+    private static final long KEY_CNT = 2_000_000L;
+
+    /** */
+    private static final String CACHE_NAME = "test_cache";
+
+    /** */
+    private static final String HAS_CACHE = "HAS_CACHE";
+
+    /** */
+    private static final int RMT_NODES_CNT = 3;
+
+    /** */
+    private static final long HANG_TIMEOUT =  15 * 60 * 1000;
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 30 * 60 * 1000; // 30 mins
+    }
+
+    /** {@inheritDoc} */
+    @Override protected List<String> additionalRemoteJvmArgs() {
+        return Arrays.asList("-Xmx128m");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isRemoteJvm(String igniteInstanceName) {
+        return igniteInstanceName.startsWith("remote");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setPersistenceEnabled(true)))
+            .setCacheConfiguration(new CacheConfiguration()
+                .setName(CACHE_NAME)
+                .setNodeFilter(new TestNodeFilter())
+                .setBackups(0)
+                .setQueryParallelism(queryParallelism())
+                .setQueryEntities(Collections.singleton(new QueryEntity()
+                    .setTableName("test")
+                    .setKeyFieldName("ID")
+                    .setValueType(Value.class.getName())
+                    .addQueryField("ID", Long.class.getName(), null)
+                    .addQueryField("INDEXED", Long.class.getName(), null)
+                    .addQueryField("VAL", Long.class.getName(), null)
+                    .addQueryField("STR", String.class.getName(), null)
+                    .setIndexes(Collections.singleton(new QueryIndex("INDEXED"))))))
+            .setUserAttributes(igniteInstanceName.startsWith("remote") ? F.asMap(HAS_CACHE, true) : null)
+            .setClientMode(igniteInstanceName.startsWith("client"));
+    }
+
+    /**
+     * @return query parallelism value.
+     */
+    protected abstract int queryParallelism();
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        cleanPersistenceDir();
+
+        Ignite local = startGrid(0);
+
+        for (int i = 0; i < RMT_NODES_CNT; ++i)
+            startGrid("remote-" + i);
+
+        local.cluster().active(true);
+
+        try (IgniteDataStreamer streamer = local.dataStreamer(CACHE_NAME)) {
+            for (long i = 0; i < KEY_CNT; ++i) {
+                streamer.addData(i, new Value(i));
+
+                if (i % 100_000 == 0)
+                    log.info("Populate " + i + " values");
+            }
+        }
+
+        awaitPartitionMapExchange(true, true, null);
+
+        local.cluster().active(false);
+
+        stopAllGrids(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        cleanPersistenceDir();
+
+        stopAllGrids();
+    }
+
+    /**
+     * beforeTest is not user to save the time fot muted tests.
+     * @throws Exception On error.
+     */
+    private void startTestGrid() throws Exception {
+        log.info("Restart cluster");
+
+        Ignite loc = startGrid(0);
+
+        for (int i = 0; i < RMT_NODES_CNT; ++i)
+            startGrid("remote-" + i);
+
+        loc.cluster().active(true);
+
+        stopGrid(0, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(false);
+
+        IgniteProcessProxy.killAll();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Test
+    public void testHeavyScanLazy() throws Exception {
+        startTestGrid();
+
+        checkQuery("SELECT * from test", KEY_CNT, true);
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+    @Test
+    public void testHeavyScanNonLazy() throws Exception {
+        startTestGrid();
+
+        checkQueryExpectOOM("SELECT * from test", false);
+    }
+
+    /**
+     * OOM on reduce. See IGNITE-9933
+     * @throws Exception On error.
+     */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9933")
+    @Test
+    public void testHeavySortByPkLazy() throws Exception {
+        startTestGrid();
+
+        checkQueryExpectOOM("SELECT * from test ORDER BY id", true);
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+    @Test
+    public void testHeavySortByPkNotLazy() throws Exception {
+        startTestGrid();
+
+        checkQueryExpectOOM("SELECT * from test ORDER BY id", false);
+    }
+
+    /**
+     * OOM on reduce. See IGNITE-9933
+     * @throws Exception On error.
+     */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9933")
+    @Test
+    public void testHeavySortByIndexLazy() throws Exception {
+        startTestGrid();
+
+        checkQueryExpectOOM("SELECT * from test ORDER BY indexed", true);
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+    @Test
+    public void testHeavySortByIndexNotLazy() throws Exception {
+        startTestGrid();
+
+        checkQueryExpectOOM("SELECT * from test ORDER BY indexed", false);
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+    @Test
+    public void testHeavySortByNotIndexLazy() throws Exception {
+        startTestGrid();
+
+        checkQueryExpectOOM("SELECT * from test ORDER BY STR", true);
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+    @Test
+    public void testHeavySortByNotIndexNotLazy() throws Exception {
+        startTestGrid();
+
+        checkQueryExpectOOM("SELECT * from test ORDER BY str", false);
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Test
+    public void testHeavyGroupByPkLazy() throws Exception {
+        startTestGrid();
+
+        checkQuery("SELECT id, sum(val) from test GROUP BY id", KEY_CNT, true, true);
+    }
+
+    /**
+     * @throws Exception On error.
+     */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9480")
+    @Test
+    public void testHeavyGroupByPkNotLazy() throws Exception {
+
+        startTestGrid();
+
+        checkQueryExpectOOM("SELECT id, sum(val) from test GROUP BY id", false, true);
+    }
+
+
+    /**
+     * @param sql Query.
+     * @param lazy Lazy mode.
+     * @throws Exception On error.
+     */
+    private void checkQueryExpectOOM(String sql, boolean lazy) throws Exception {
+        checkQueryExpectOOM(sql, lazy, false);
+    }
+
+    /**
+     * @param sql Query.
+     * @param lazy Lazy mode.
+     * @param collocated Collocated GROUP BY mode.
+     * @throws Exception On error.
+     */
+    private void checkQueryExpectOOM(String sql, boolean lazy, boolean collocated) throws Exception {
+        final AtomicBoolean hangTimeout = new AtomicBoolean();
+        final AtomicBoolean hangCheckerEnd = new AtomicBoolean();
+
+        // Start grid hang checker.
+        // In some cases grid hangs (e.g. when OOME is thrown at the discovery thread).
+        IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+            try {
+                long startTime = U.currentTimeMillis();
+
+                while (!hangCheckerEnd.get() && U.currentTimeMillis() - startTime < HANG_TIMEOUT)
+                    U.sleep(1000);
+
+                if (hangCheckerEnd.get())
+                    return;
+
+                hangTimeout.set(true);
+
+                log.info("Kill hung grids");
+
+                stopAllGrids();
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                fail("Unexpected interruption");
+            }
+        });
+
+        try {
+            checkQuery(sql, 0, lazy, collocated);
+        }
+        catch (Exception e) {
+            if (hangTimeout.get()) {
+                log.info("Grid hangs");
+
+                return;
+            }
+
+            if (e.getMessage().contains("Failed to execute SQL query. Out of memory"))
+                log.info("OOME is thrown");
+            else if (e.getMessage().contains("Failed to communicate with Ignite cluster"))
+                log.info("Node is down");
+            else
+                log.warning("Other error with OOME cause", e);
+        }
+        finally {
+            hangCheckerEnd.set(true);
+
+            fut.get();
+        }
+    }
+
+    /**
+     * @param sql Query.
+     * @param expectedRowCnt Expected row count.
+     * @param lazy Lazy mode.
+     * @throws Exception On failure.
+     */
+    public void checkQuery(String sql, long expectedRowCnt, boolean lazy) throws Exception {
+        checkQuery(sql, expectedRowCnt, lazy, false);
+    }
+
+    /**
+     * @param sql Query.
+     * @param expectedRowCnt Expected row count.
+     * @param lazy Lazy mode.
+     * @param collocated Collocated group by flag.
+     * @throws Exception On failure.
+     */
+    public void checkQuery(String sql, long expectedRowCnt, boolean lazy, boolean collocated) throws Exception {
+        try (Connection c = DriverManager.getConnection(
+            "jdbc:ignite:thin://127.0.0.1:10800..10850/\"test_cache\"" +
+                "?collocated=" + collocated +
+                "&lazy=" + lazy)) {
+            try (Statement stmt = c.createStatement()) {
+                log.info("Run heavy query: " + sql);
+
+                stmt.execute(sql);
+
+                ResultSet rs = stmt.getResultSet();
+
+                long cnt = 0;
+                while (rs.next())
+                    cnt++;
+
+                assertEquals("Invalid row count:", expectedRowCnt, cnt);
+            }
+        }
+    }
+
+    /** */
+    public static class Value {
+        /** Secondary ID. */
+        @QuerySqlField(index = true)
+        private long indexed;
+
+        /** Secondary ID. */
+        @QuerySqlField
+        private long val;
+
+        /** String value. */
+        @QuerySqlField
+        private String str;
+
+        /**
+         * @param id ID.
+         */
+        public Value(long id) {
+            indexed = id / 10;
+            val = id;
+            str = "value " + id;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TestNodeFilter implements IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return node.attribute(HAS_CACHE) != null;
+        }
+    }
+}
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/IgniteQueryOOMTestSuite.java
similarity index 75%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/IgniteQueryOOMTestSuite.java
index 0a6181e..d57a607 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/IgniteQueryOOMTestSuite.java
@@ -15,15 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.testsuites;
+package org.apache.ignite.internal.processors.query.oom;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
 /**
- * Special test suite with ignored tests for Binary mode.
+ * Test suite for queries produces OOME in some cases.
  */
 @RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+@Suite.SuiteClasses({
+    //Query history.
+    QueryOOMWithoutQueryParallelismTest.class,
+    QueryOOMWithQueryParallelismTest.class,
+})
+public class IgniteQueryOOMTestSuite {
 }
+
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithQueryParallelismTest.java
similarity index 72%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithQueryParallelismTest.java
index 0a6181e..4a0404c 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithQueryParallelismTest.java
@@ -15,15 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.testsuites;
+package org.apache.ignite.internal.processors.query.oom;
 
 import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+import org.junit.runners.JUnit4;
 
 /**
- * Special test suite with ignored tests for Binary mode.
+ * Tests for OOME on query.
  */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+@RunWith(JUnit4.class)
+public class QueryOOMWithQueryParallelismTest extends AbstractQueryOOMTest {
+    /** {@inheritDoc} */
+    @Override protected int queryParallelism() {
+        return 4;
+    }
 }
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithoutQueryParallelismTest.java
similarity index 72%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithoutQueryParallelismTest.java
index 0a6181e..2db821a 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryOOMWithoutQueryParallelismTest.java
@@ -15,15 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.testsuites;
+package org.apache.ignite.internal.processors.query.oom;
 
 import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+import org.junit.runners.JUnit4;
 
 /**
- * Special test suite with ignored tests for Binary mode.
+ * Tests for OOME on query.
  */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+@RunWith(JUnit4.class)
+public class QueryOOMWithoutQueryParallelismTest extends AbstractQueryOOMTest {
+    /** {@inheritDoc} */
+    @Override protected int queryParallelism() {
+        return 1;
+    }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 91d783f..359eff5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -177,6 +177,8 @@ import org.apache.ignite.internal.processors.client.ClientConnectorConfiguration
 import org.apache.ignite.internal.processors.database.baseline.IgniteStableBaselineBinObjFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.query.IgniteCachelessQueriesSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest;
+import org.apache.ignite.internal.processors.query.IgniteQueryTableLockAndConnectionPoolLazyModeOffTest;
+import org.apache.ignite.internal.processors.query.IgniteQueryTableLockAndConnectionPoolLazyModeOnTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlDefaultValueTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlDistributedJoinSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
@@ -193,7 +195,6 @@ import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTe
 import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
-import org.apache.ignite.internal.processors.query.LazyQuerySelfTest;
 import org.apache.ignite.internal.processors.query.MultipleStatementsSqlQuerySelfTest;
 import org.apache.ignite.internal.processors.query.RunningQueriesTest;
 import org.apache.ignite.internal.processors.query.SqlIllegalSchemaSelfTest;
@@ -300,7 +301,8 @@ import org.junit.runners.Suite;
     IgniteDynamicSqlRestoreTest.class,
 
     // Queries tests.
-    LazyQuerySelfTest.class,
+    IgniteQueryTableLockAndConnectionPoolLazyModeOnTest.class,
+    IgniteQueryTableLockAndConnectionPoolLazyModeOffTest.class,
     IgniteSqlSplitterSelfTest.class,
     SqlPushDownFunctionTest.class,
     IgniteSqlSegmentedIndexSelfTest.class,
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite.java
similarity index 59%
copy from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
copy to modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite.java
index 0a6181e..09915bb 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite.java
@@ -15,15 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.testsuites;
+package org.apache.ignite.testsuites.nightly;
 
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testsuites.IgniteBinaryCacheQueryTestSuite;
+import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
 /**
- * Special test suite with ignored tests for Binary mode.
+ * Test suite for cache queries with lazy mode.
  */
 @RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+@Suite.SuiteClasses({
+    IgniteBinaryCacheQueryTestSuite.class,
+})
+public class IgniteBinaryCacheQueryLazyTestSuite {
+    /**
+     * Setup lazy mode default.
+     */
+    @BeforeClass
+    public static void setupLazy() {
+        GridTestUtils.setFieldValue(SqlFieldsQuery.class, "DFLT_LAZY", true);
+    }
 }
diff --git a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite2.java
similarity index 59%
rename from modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
rename to modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite2.java
index 0a6181e..164a725 100644
--- a/modules/ignored-tests/src/test/java/org/apache/ignite/testsuites/IgniteIgnoredBinarySimpleMapperTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/nightly/IgniteBinaryCacheQueryLazyTestSuite2.java
@@ -15,15 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.testsuites;
+package org.apache.ignite.testsuites.nightly;
 
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testsuites.IgniteBinaryCacheQueryTestSuite2;
+import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
 /**
- * Special test suite with ignored tests for Binary mode.
+ * Test suite for cache queries with lazy mode.
  */
 @RunWith(Suite.class)
-@Suite.SuiteClasses({IgniteBinarySimpleNameMapperCacheQueryTestSuite.class})
-public class IgniteIgnoredBinarySimpleMapperTestSuite {
+@Suite.SuiteClasses({
+    IgniteBinaryCacheQueryTestSuite2.class,
+})
+public class IgniteBinaryCacheQueryLazyTestSuite2 {
+    /**
+     * Setup lazy mode default.
+     */
+    @BeforeClass
+    public static void setupLazy() {
+        GridTestUtils.setFieldValue(SqlFieldsQuery.class, "DFLT_LAZY", true);
+    }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
index ef5623e..bee6ffd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
@@ -765,11 +765,11 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
         public void TestSqlQueryTimeout()
         {
             var cache = Cache();
-            PopulateCache(cache, false, 20000, x => true);
+            PopulateCache(cache, false, 30000, x => true);
 
-            var sqlQry = new SqlQuery(typeof(QueryPerson), "WHERE age < 500 AND name like '%1%'")
+            var sqlQry = new SqlQuery(typeof(QueryPerson), "WHERE age < 2000")
             {
-                Timeout = TimeSpan.FromMilliseconds(2)
+                Timeout = TimeSpan.FromMilliseconds(1)
             };
 
             // ReSharper disable once ReturnValueOfPureMethodIsNotUsed
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs
index 02d13f6..760a48d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/SqlQueryTest.cs
@@ -150,9 +150,9 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
         {
             var cache = GetClientCache<Person>();
 
-            cache.PutAll(Enumerable.Range(1, 30000).ToDictionary(x => x, x => new Person(x)));
+            cache.PutAll(Enumerable.Range(1, 1000).ToDictionary(x => x, x => new Person(x)));
 
-            var qry = new SqlFieldsQuery("select * from Person where Name like '%ers%'")
+            var qry = new SqlFieldsQuery("select * from Person p0, Person p1, Person p2")
             {
                 Timeout = TimeSpan.FromMilliseconds(1)
             };
diff --git a/modules/platforms/nodejs/spec/query/SqlFieldsQuery.spec.js b/modules/platforms/nodejs/spec/query/SqlFieldsQuery.spec.js
index c81838a..bf75759 100644
--- a/modules/platforms/nodejs/spec/query/SqlFieldsQuery.spec.js
+++ b/modules/platforms/nodejs/spec/query/SqlFieldsQuery.spec.js
@@ -77,12 +77,31 @@ describe('sql fields query test suite >', () => {
             catch(error => done.fail(error));
     });
 
-    it('get all with page size', (done) => {
+    it('get all with page size lazy true', (done) => {
         Promise.resolve().
             then(async () => {
                 let cache = igniteClient.getCache(CACHE_NAME);
                 const cursor = await cache.query(
-                    new SqlFieldsQuery(`SELECT * FROM ${TABLE_NAME}`).setPageSize(1));
+                    new SqlFieldsQuery(`SELECT * FROM ${TABLE_NAME}`).setPageSize(1).setLazy(true));
+                const set = new Set();
+                for (let fields of await cursor.getAll()) {
+                    expect(fields.length).toBe(2);
+                    expect(generateValue(fields[0]) === fields[1]).toBe(true);
+                    set.add(fields[0]);
+                    expect(fields[0] >= 0 && fields[0] < ELEMENTS_NUMBER).toBe(true);
+                }
+                expect(set.size).toBe(ELEMENTS_NUMBER);
+            }).
+            then(done).
+            catch(error => done.fail(error));
+    });
+
+    it('get all with page size lazy false', (done) => {
+        Promise.resolve().
+            then(async () => {
+                let cache = igniteClient.getCache(CACHE_NAME);
+                const cursor = await cache.query(
+                    new SqlFieldsQuery(`SELECT * FROM ${TABLE_NAME}`).setPageSize(1).setLazy(false));
                 const set = new Set();
                 for (let fields of await cursor.getAll()) {
                     expect(fields.length).toBe(2);
diff --git a/modules/platforms/php/tests/SqlFieldsQueryTest.php b/modules/platforms/php/tests/SqlFieldsQueryTest.php
index 3e74ca0..802f738 100644
--- a/modules/platforms/php/tests/SqlFieldsQueryTest.php
+++ b/modules/platforms/php/tests/SqlFieldsQueryTest.php
@@ -64,10 +64,22 @@ final class SqlFieldsQueryTestCase extends TestCase
         $this->assertEquals($set->count(), self::ELEMENTS_NUMBER);
     }
 
-    public function testGetAllWithPageSize(): void
+    public function testGetAllWithPageSizeLazyTrue(): void
     {
         $cache = self::$cache;
-        $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(1));
+        $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(1)->setLazy(true));
+        $set = new Set();
+        foreach ($cursor->getAll() as $fields) {
+            $this->checkCursorResult($fields);
+            $set->add($fields[0]);
+        }
+        $this->assertEquals($set->count(), self::ELEMENTS_NUMBER);
+    }
+
+    public function testGetAllWithPageSizeLazyFalse(): void
+    {
+        $cache = self::$cache;
+        $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(1)->setLazy(false));
         $set = new Set();
         foreach ($cursor->getAll() as $fields) {
             $this->checkCursorResult($fields);
@@ -88,10 +100,22 @@ final class SqlFieldsQueryTestCase extends TestCase
         $this->assertEquals($set->count(), self::ELEMENTS_NUMBER);
     }
 
-    public function testIterateCursorWithPageSize(): void
+    public function testIterateCursorWithPageSizeLazyTrue(): void
+    {
+        $cache = self::$cache;
+        $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(2)->setLazy(true));
+        $set = new Set();
+        foreach ($cursor as $fields) {
+            $this->checkCursorResult($fields);
+            $set->add($fields[0]);
+        }
+        $this->assertEquals($set->count(), self::ELEMENTS_NUMBER);
+    }
+
+    public function testIterateCursorWithPageSizeLazyFalse(): void
     {
         $cache = self::$cache;
-        $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(2));
+        $cursor = $cache->query((new SqlFieldsQuery(self::$selectFromTable))->setPageSize(2)->setLazy(false));
         $set = new Set();
         foreach ($cursor as $fields) {
             $this->checkCursorResult($fields);
diff --git a/modules/yardstick/config/benchmark-native-sql-select-join.properties b/modules/yardstick/config/benchmark-native-sql-select-join.properties
index f32622a..dc280d1 100644
--- a/modules/yardstick/config/benchmark-native-sql-select-join.properties
+++ b/modules/yardstick/config/benchmark-native-sql-select-join.properties
@@ -24,8 +24,6 @@ JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false"
 
 # Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses.
 JVM_OPTS=${JVM_OPTS}" \
--Xms8g \
--Xmx8g \
 -Xloggc:./gc${now0}.log \
 -XX:+PrintGCDetails \
 -verbose:gc \
@@ -51,8 +49,8 @@ RESTART_SERVERS=true
 # BENCHMARK_WRITER=
 
 # The benchmark is applicable only for 1 server and 1 driver
-SERVER_HOSTS=""
-DRIVER_HOSTS=127.0.0.1
+SERVER_HOSTS="127.0.0.1,127.0.0.1"
+DRIVER_HOSTS="127.0.0.1,127.0.0.1,127.0.0.1"
 
 # Remote username.
 # REMOTE_USER=
diff --git a/modules/yardstick/config/benchmark-native-sql-select.properties b/modules/yardstick/config/benchmark-native-sql-select.properties
index 0f0b606..8da3541 100644
--- a/modules/yardstick/config/benchmark-native-sql-select.properties
+++ b/modules/yardstick/config/benchmark-native-sql-select.properties
@@ -24,8 +24,6 @@ JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false"
 
 # Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses.
 JVM_OPTS=${JVM_OPTS}" \
--Xms8g \
--Xmx8g \
 -Xloggc:./gc${now0}.log \
 -XX:+PrintGCDetails \
 -verbose:gc \
@@ -51,8 +49,8 @@ RESTART_SERVERS=true
 # BENCHMARK_WRITER=
 
 # The benchmark is applicable only for 1 server and 1 driver
-SERVER_HOSTS=127.0.0.1
-DRIVER_HOSTS=127.0.0.1
+SERVER_HOSTS="127.0.0.1,127.0.0.1"
+DRIVER_HOSTS="127.0.0.1,127.0.0.1,127.0.0.1"
 
 # Remote username.
 # REMOTE_USER=
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java
index 2b6c186..23fd04f 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/JdbcUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.yardstick.jdbc;
 
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.IgniteEx;
@@ -31,43 +32,60 @@ import static org.yardstickframework.BenchmarkUtils.println;
 public class JdbcUtils {
     /**
      * Common method to fill test stand with data.
+     *
      * @param cfg Benchmark configuration.
      * @param ignite Ignite node.
      * @param range Data key range.
      */
-    public static void fillData(BenchmarkConfiguration cfg,  IgniteEx ignite, long range, CacheAtomicityMode atomicMode) {
-        println(cfg, "Create table...");
+    public static void fillData(BenchmarkConfiguration cfg, IgniteEx ignite, long range,
+        CacheAtomicityMode atomicMode) {
+        IgniteSemaphore sem = ignite.semaphore("sql-setup", 1, true, true);
 
-        StringBuilder qry = new StringBuilder("CREATE TABLE test_long (id LONG PRIMARY KEY, val LONG)" +
-            " WITH \"wrap_value=true");
+        try {
+            if (sem.tryAcquire()) {
+                println(cfg, "Create table...");
 
-        if (atomicMode != null)
-            qry.append(", atomicity=").append(atomicMode.name());
+                StringBuilder qry = new StringBuilder("CREATE TABLE test_long (id LONG PRIMARY KEY, val LONG)" +
+                    " WITH \"wrap_value=true");
 
-        qry.append("\";");
+                if (atomicMode != null)
+                    qry.append(", atomicity=").append(atomicMode.name());
 
-        String qryStr = qry.toString();
+                qry.append("\";");
 
-        println(cfg, "Creating table with schema: " + qryStr);
+                String qryStr = qry.toString();
 
-        GridQueryProcessor qProc = ignite.context().query();
+                println(cfg, "Creating table with schema: " + qryStr);
 
-        qProc.querySqlFields(
-            new SqlFieldsQuery(qryStr), true);
+                GridQueryProcessor qProc = ignite.context().query();
 
-        println(cfg, "Populate data...");
+                qProc.querySqlFields(
+                    new SqlFieldsQuery(qryStr), true);
 
-        for (long l = 1; l <= range; ++l) {
-            qProc.querySqlFields(
-                new SqlFieldsQuery("INSERT INTO test_long (id, val) VALUES (?, ?)")
-                    .setArgs(l, l + 1), true);
+                println(cfg, "Populate data...");
 
-            if (l % 10000 == 0)
-                println(cfg, "Populate " + l);
-        }
+                for (long l = 1; l <= range; ++l) {
+                    qProc.querySqlFields(
+                        new SqlFieldsQuery("INSERT INTO test_long (id, val) VALUES (?, ?)")
+                            .setArgs(l, l + 1), true);
+
+                    if (l % 10000 == 0)
+                        println(cfg, "Populate " + l);
+                }
+
+                qProc.querySqlFields(new SqlFieldsQuery("CREATE INDEX val_idx ON test_long (val)"), true);
 
-        qProc.querySqlFields(new SqlFieldsQuery("CREATE INDEX val_idx ON test_long (val)"), true);
+                println(cfg, "Finished populating data");
+            }
+            else {
+                // Acquire (wait setup by other client) and immediately release/
+                println(cfg, "Waits for setup...");
 
-        println(cfg, "Finished populating data");
+                sem.acquire();
+            }
+        }
+        finally {
+            sem.release();
+        }
     }
 }
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlJoinQueryRangeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlJoinQueryRangeBenchmark.java
index f660b8c..191d6a0 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlJoinQueryRangeBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/jdbc/NativeSqlJoinQueryRangeBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.ignite.yardstick.jdbc;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.IgniteEx;
@@ -50,7 +51,7 @@ public class NativeSqlJoinQueryRangeBenchmark extends IgniteAbstractBenchmark {
             qry = new SqlFieldsQuery("SELECT * FROM person p join organization o on p.orgId=o.id WHERE p.id = ? " +
                 "order by p.id");
 
-            qry.setArgs(ThreadLocalRandom.current().nextLong(args.range()) + 1);
+            qry.setArgs(ThreadLocalRandom.current().nextLong(args.range()));
 
             expRsSize = 1;
         }
@@ -63,7 +64,7 @@ public class NativeSqlJoinQueryRangeBenchmark extends IgniteAbstractBenchmark {
             qry = new SqlFieldsQuery("SELECT * FROM person p join organization o on p.orgId=o.id WHERE p.id BETWEEN ? AND ?" +
                 "order by p.id");
 
-            long id = ThreadLocalRandom.current().nextLong(args.range() - args.sqlRange()) + 1;
+            long id = ThreadLocalRandom.current().nextLong(args.range() - args.sqlRange());
             long maxId = id + args.sqlRange() - 1;
 
             qry.setArgs(id, maxId);
@@ -74,14 +75,16 @@ public class NativeSqlJoinQueryRangeBenchmark extends IgniteAbstractBenchmark {
         long rsSize = 0;
 
         try (FieldsQueryCursor<List<?>> cursor = ((IgniteEx)ignite()).context().query()
-                .querySqlFields(qry, false)) {
+            .querySqlFields(qry, false)) {
 
             for (List<?> r : cursor)
                 rsSize++;
         }
 
-        if (rsSize != expRsSize)
-            throw new Exception("Invalid result set size [actual=" + rsSize + ", expected=" + expRsSize + ']');
+        if (rsSize != expRsSize) {
+            throw new Exception("Invalid result set size [actual=" + rsSize + ", expected=" + expRsSize
+                + ", qry=" + qry + ']');
+        }
 
         return true;
     }
@@ -90,46 +93,61 @@ public class NativeSqlJoinQueryRangeBenchmark extends IgniteAbstractBenchmark {
     @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
         super.setUp(cfg);
 
-        qry = ((IgniteEx)ignite()).context().query();
+        IgniteSemaphore sem = ignite().semaphore("sql-setup", 1, true, true);
+
+        try {
+            if (sem.tryAcquire()) {
+                qry = ((IgniteEx)ignite()).context().query();
+
+                StringBuilder withExpr = new StringBuilder(" WITH \"AFFINITY_KEY=orgId,");
 
-        StringBuilder withExpr = new StringBuilder(" WITH \"AFFINITY_KEY=orgId,");
+                if (args.atomicMode() != null)
+                    withExpr.append("atomicity=").append(args.atomicMode().name()).append(",");
 
-        if (args.atomicMode() != null)
-            withExpr.append("atomicity=").append(args.atomicMode().name()).append(",");
+                if (args.partitionedCachesNumber() == 1)
+                    withExpr.append("template=replicated");
+                else
+                    withExpr.append("template=partitioned");
 
-        if (args.partitionedCachesNumber() == 1)
-            withExpr.append("template=replicated");
-        else
-            withExpr.append("template=partitioned");
+                withExpr.append("\"");
 
-        withExpr.append("\"");
+                qry.querySqlFields(
+                    new SqlFieldsQuery("CREATE TABLE person (id long, orgId long, name varchar, PRIMARY KEY (id, orgId))" + withExpr), true);
 
-        qry.querySqlFields(
-            new SqlFieldsQuery("CREATE TABLE person (id long, orgId long, name varchar, PRIMARY KEY (id, orgId))" + withExpr), true);
+                withExpr = new StringBuilder(" WITH \"");
 
-        withExpr = new StringBuilder(" WITH \"");
+                if (args.atomicMode() != null)
+                    withExpr.append("atomicity=").append(args.atomicMode().name()).append(",");
 
-        if (args.atomicMode() != null)
-            withExpr.append("atomicity=").append(args.atomicMode().name()).append(",");
+                withExpr.append("template=partitioned");
 
-        withExpr.append("template=partitioned");
+                withExpr.append("\"");
 
-        withExpr.append("\"");
+                qry.querySqlFields(
+                    new SqlFieldsQuery("CREATE TABLE organization (id long primary key, name varchar)" + withExpr), true);
 
-        qry.querySqlFields(
-            new SqlFieldsQuery("CREATE TABLE organization (id long primary key, name varchar)" + withExpr), true);
+                for (long k = 0; k <= args.range(); ++k) {
+                    qry.querySqlFields(new SqlFieldsQuery("insert into person (id, orgId, name) values (?, ?, ?)")
+                        .setArgs(k, k / 10, "person " + k), true).getAll();
 
-        for (long k = 1; k <= args.range(); ++k) {
-            qry.querySqlFields(new SqlFieldsQuery("insert into person (id, orgId, name) values (?, ?, ?)")
-                .setArgs(k, k / 10, "person " + k), true).getAll();
+                    if (k % 10 == 0) {
+                        qry.querySqlFields(new SqlFieldsQuery("insert into organization (id, name) values (?, ?)")
+                            .setArgs(k / 10, "organization " + k / 10), true).getAll();
+                    }
 
-            if (k % 10 == 0) {
-                qry.querySqlFields(new SqlFieldsQuery("insert into organization (id, name) values (?, ?)")
-                    .setArgs(k / 10, "organization " + k / 10), true).getAll();
+                    if (k % 10000 == 0)
+                        println(cfg, "Populate " + k);
+                }
             }
+            else {
+                // Acquire (wait setup by other client) and immediately release/
+                println(cfg, "Waits for setup...");
 
-            if (k % 10000 == 0)
-                println(cfg, "Populate " + k);
+                sem.acquire();
+            }
+        }
+        finally {
+            sem.release();
         }
     }
 }