You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/10/11 11:34:30 UTC

[2/3] ignite git commit: IGNITE-9171: SQL: always execute queries in lazy mode. This closes #4514. This closes #4538. This closes #4870.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index f228111..9b7d268 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -40,7 +40,6 @@ import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -58,12 +57,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.CompoundLockFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -71,8 +70,10 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable;
 import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
 import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
@@ -97,13 +98,13 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.apache.ignite.thread.IgniteThread;
+import org.h2.api.ErrorCode;
 import org.h2.command.Prepared;
 import org.h2.jdbc.JdbcResultSet;
+import org.h2.jdbc.JdbcSQLException;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_FORCE_LAZY_RESULT_SET;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
@@ -123,9 +124,6 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2V
 @SuppressWarnings("ForLoopReplaceableByForEach")
 public class GridMapQueryExecutor {
     /** */
-    public static final boolean FORCE_LAZY = IgniteSystemProperties.getBoolean(IGNITE_SQL_FORCE_LAZY_RESULT_SET);
-
-    /** */
     private IgniteLogger log;
 
     /** */
@@ -149,8 +147,8 @@ public class GridMapQueryExecutor {
     /** Busy lock for lazy workers. */
     private final GridSpinBusyLock lazyWorkerBusyLock = new GridSpinBusyLock();
 
-    /** Lazy worker stop guard. */
-    private final AtomicBoolean lazyWorkerStopGuard = new AtomicBoolean();
+    /** Stop guard. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
 
     /**
      * @param busyLock Busy lock.
@@ -207,18 +205,21 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * Cancel active lazy queries and prevent submit of new queries.
+     * Stop query map executor, cleanup resources.
      */
-    public void cancelLazyWorkers() {
-        if (!lazyWorkerStopGuard.compareAndSet(false, true))
+    public void stop() {
+        if (!stopGuard.compareAndSet(false, true))
             return;
 
-        lazyWorkerBusyLock.block();
+        for (MapNodeResults res : qryRess.values())
+            res.cancelAll();
 
-        for (MapQueryLazyWorker worker : lazyWorkers.values())
-            worker.stop(false);
+        for (MapQueryLazyWorker w : lazyWorkers.values())
+            w.stop(true);
 
-        lazyWorkers.clear();
+        lazyWorkerBusyLock.block();
+
+        assert lazyWorkers.isEmpty() : "Not cleaned lazy workers: " + lazyWorkers.size();
     }
 
     /**
@@ -259,7 +260,7 @@ public class GridMapQueryExecutor {
      * @return Busy lock for lazy workers to guard their operations with.
      */
     GridSpinBusyLock busyLock() {
-        return busyLock;
+        return lazyWorkerBusyLock;
     }
 
     /**
@@ -554,6 +555,7 @@ public class GridMapQueryExecutor {
     /**
      * @param node Node.
      * @param req Query request.
+     * @throws IgniteCheckedException On error.
      */
     private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
         int[] qryParts = req.queryPartitions();
@@ -566,10 +568,14 @@ public class GridMapQueryExecutor {
             req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
             req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
 
+        final GridDhtTxLocalAdapter tx;
+
+        GridH2SelectForUpdateTxDetails txReq = req.txDetails();
+
         final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
         final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
         final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
-        final boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
+        final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY) && txReq == null;
 
         final List<Integer> cacheIds = req.caches();
 
@@ -578,10 +584,6 @@ public class GridMapQueryExecutor {
 
         final Object[] params = req.parameters();
 
-        final GridDhtTxLocalAdapter tx;
-
-        GridH2SelectForUpdateTxDetails txReq = req.txDetails();
-
         try {
             if (txReq != null) {
                 // Prepare to run queries.
@@ -736,7 +738,11 @@ public class GridMapQueryExecutor {
      * @param parts Explicit partitions for current node.
      * @param pageSize Page size.
      * @param distributedJoinMode Query distributed join mode.
-     * @param lazy Streaming flag.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param replicated Replicated flag.
+     * @param timeout Query timeout.
+     * @param params Query params.
+     * @param lazy Lazy query execution flag.
      * @param mvccSnapshot MVCC snapshot.
      * @param tx Transaction.
      * @param txDetails TX details, if it's a {@code FOR UPDATE} request, or {@code null}.
@@ -765,75 +771,24 @@ public class GridMapQueryExecutor {
         @Nullable final GridH2SelectForUpdateTxDetails txDetails,
         @Nullable final CompoundLockFuture lockFut,
         @Nullable final AtomicInteger runCntr) {
-        MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
-
         // In presence of TX, we also must always have matching details.
         assert tx == null || txDetails != null;
 
-        boolean inTx = (tx != null);
-
-        if (lazy && worker == null) {
-            // Lazy queries must be re-submitted to dedicated workers.
-            MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
-            worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
-
-            worker.submit(new Runnable() {
-                @Override public void run() {
-                    onQueryRequest0(
-                        node,
-                        reqId,
-                        segmentId,
-                        schemaName,
-                        qrys,
-                        cacheIds,
-                        topVer,
-                        partsMap,
-                        parts,
-                        pageSize,
-                        distributedJoinMode,
-                        enforceJoinOrder,
-                        replicated,
-                        timeout,
-                        params,
-                        true,
-                        mvccSnapshot,
-                        tx,
-                        txDetails,
-                        lockFut,
-                        runCntr);
-                }
-            });
-
-            if (lazyWorkerBusyLock.enterBusy()) {
-                try {
-                    MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker);
-
-                    if (oldWorker != null)
-                        oldWorker.stop(false);
+        assert !lazy || txDetails == null : "Lazy execution of SELECT FOR UPDATE queries is not supported.";
 
-                    IgniteThread thread = new IgniteThread(worker);
-
-                    thread.start();
-                }
-                finally {
-                    lazyWorkerBusyLock.leaveBusy();
-                }
-            }
-            else
-                log.info("Ignored query request (node is stopping) [nodeId=" + node.id() + ", reqId=" + reqId + ']');
+        boolean inTx = (tx != null);
 
-            return;
-        }
+        MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
 
-        if (lazy && txDetails != null)
-            throw new IgniteSQLException("Lazy execution of SELECT FOR UPDATE queries is not supported.");
+        if (lazy && worker == null)
+            worker = createLazyWorker(node, reqId, segmentId);
 
         // Prepare to run queries.
         GridCacheContext<?, ?> mainCctx = mainCacheContext(cacheIds);
 
         MapNodeResults nodeRess = resultsForNode(node.id());
 
-        MapQueryResults qr = null;
+        MapQueryResults qryResults = null;
 
         List<GridReservable> reserved = new ArrayList<>();
 
@@ -847,7 +802,7 @@ public class GridMapQueryExecutor {
                 if (!F.isEmpty(err)) {
                     // Unregister lazy worker because re-try may never reach this node again.
                     if (lazy)
-                        stopAndUnregisterCurrentLazyWorker();
+                        worker.stop(false);
 
                     sendRetry(node, reqId, segmentId, err);
 
@@ -855,10 +810,7 @@ public class GridMapQueryExecutor {
                 }
             }
 
-            qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, MapQueryLazyWorker.currentWorker(), inTx);
-
-            if (nodeRess.put(reqId, segmentId, qr) != null)
-                throw new IllegalStateException();
+            qryResults = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, worker, inTx);
 
             // Prepare query context.
             GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
@@ -872,186 +824,207 @@ public class GridMapQueryExecutor {
                 .pageSize(pageSize)
                 .topologyVersion(topVer)
                 .reservations(reserved)
-                .mvccSnapshot(mvccSnapshot)
-                .lazyWorker(worker);
-
-            Connection conn = h2.connectionForSchema(schemaName);
-
-            H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
-
-            GridH2QueryContext.set(qctx);
+                .mvccSnapshot(mvccSnapshot);
 
             // qctx is set, we have to release reservations inside of it.
             reserved = null;
 
-            try {
-                if (nodeRess.cancelled(reqId)) {
-                    GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
+            if (worker != null)
+                worker.queryContext(qctx);
 
-                    nodeRess.cancelRequest(reqId);
+            GridH2QueryContext.set(qctx);
 
-                    throw new QueryCancelledException();
-                }
+            if (nodeRess.put(reqId, segmentId, qryResults) != null)
+                throw new IllegalStateException();
 
-                // Run queries.
-                int qryIdx = 0;
+            Connection conn = h2.connectionForSchema(schemaName);
 
-                boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+            H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder, lazy);
 
-                for (GridCacheSqlQuery qry : qrys) {
-                    ResultSet rs = null;
+            if (nodeRess.cancelled(reqId)) {
+                GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
 
-                    boolean removeMapping = false;
+                nodeRess.cancelRequest(reqId);
 
-                    // If we are not the target node for this replicated query, just ignore it.
-                    if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
-                        String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params));
+                throw new QueryCancelledException();
+            }
 
-                        PreparedStatement stmt;
+            // Run queries.
+            int qryIdx = 0;
 
-                        try {
-                            stmt = h2.prepareStatement(conn, sql, true);
-                        }
-                        catch (SQLException e) {
-                            throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e);
-                        }
+            boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
 
-                        Prepared p = GridSqlQueryParser.prepared(stmt);
+            for (GridCacheSqlQuery qry : qrys) {
+                ResultSet rs = null;
 
-                        if (GridSqlQueryParser.isForUpdateQuery(p)) {
-                            sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx);
-                            stmt = h2.prepareStatement(conn, sql, true);
-                        }
+                boolean removeMapping = false;
 
-                        h2.bindParameters(stmt, params0);
+                // If we are not the target node for this replicated query, just ignore it.
+                if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
+                    String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params));
 
-                        int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx);
+                    PreparedStatement stmt;
 
-                        rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qr.queryCancel(qryIdx));
+                    try {
+                        stmt = h2.prepareStatement(conn, sql, true);
+                    }
+                    catch (SQLException e) {
+                        throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e);
+                    }
 
-                        if (inTx) {
-                            ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future(
-                                ctx.localNodeId(),
-                                txDetails.version(),
-                                mvccSnapshot,
-                                txDetails.threadId(),
-                                IgniteUuid.randomUuid(),
-                                txDetails.miniId(),
-                                parts,
-                                tx,
-                                opTimeout,
-                                mainCctx,
-                                rs
-                            );
+                    Prepared p = GridSqlQueryParser.prepared(stmt);
 
-                            if (lockFut != null)
-                                lockFut.register(enlistFut);
+                    if (GridSqlQueryParser.isForUpdateQuery(p)) {
+                        sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx);
+                        stmt = h2.prepareStatement(conn, sql, true);
+                    }
 
-                            enlistFut.init();
+                    h2.bindParameters(stmt, params0);
 
-                            enlistFut.get();
+                    int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx);
 
-                            rs.beforeFirst();
-                        }
+                    rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qryResults.queryCancel(qryIdx));
 
-                        if (evt) {
-                            ctx.event().record(new CacheQueryExecutedEvent<>(
-                                node,
-                                "SQL query executed.",
-                                EVT_CACHE_QUERY_EXECUTED,
-                                CacheQueryType.SQL.name(),
-                                mainCctx.name(),
-                                null,
-                                qry.query(),
-                                null,
-                                null,
-                                params,
-                                node.id(),
-                                null));
-                        }
+                    if (inTx) {
+                        ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future(
+                            ctx.localNodeId(),
+                            txDetails.version(),
+                            mvccSnapshot,
+                            txDetails.threadId(),
+                            IgniteUuid.randomUuid(),
+                            txDetails.miniId(),
+                            parts,
+                            tx,
+                            opTimeout,
+                            mainCctx,
+                            rs
+                        );
+
+                        if (lockFut != null)
+                            lockFut.register(enlistFut);
+
+                        enlistFut.init();
+
+                        enlistFut.get();
+
+                        rs.beforeFirst();
+                    }
 
-                        assert rs instanceof JdbcResultSet : rs.getClass();
+                    if (evt) {
+                        ctx.event().record(new CacheQueryExecutedEvent<>(
+                            node,
+                            "SQL query executed.",
+                            EVT_CACHE_QUERY_EXECUTED,
+                            CacheQueryType.SQL.name(),
+                            mainCctx.name(),
+                            null,
+                            qry.query(),
+                            null,
+                            null,
+                            params,
+                            node.id(),
+                            null));
                     }
 
-                    qr.addResult(qryIdx, qry, node.id(), rs, params);
+                    assert rs instanceof JdbcResultSet : rs.getClass();
+                }
 
-                    if (qr.cancelled()) {
-                        qr.result(qryIdx).close();
+                qryResults.addResult(qryIdx, qry, node.id(), rs, params);
 
-                        throw new QueryCancelledException();
-                    }
+                if (qryResults.cancelled()) {
+                    qryResults.result(qryIdx).close();
 
-                    if (inTx) {
-                        if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) {
-                            if (removeMapping = tx.empty() && !tx.queryEnlisted())
-                                tx.rollbackAsync().get();
-                        }
+                    throw new QueryCancelledException();
+                }
+
+                if (inTx) {
+                    if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) {
+                        if (removeMapping = tx.empty() && !tx.queryEnlisted())
+                            tx.rollbackAsync().get();
                     }
+                }
 
-                    // Send the first page.
-                    if (lockFut == null)
-                        sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping);
-                    else {
-                        GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping);
-
-                        if (msg != null) {
-                            lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
-                                @Override public void apply(IgniteInternalFuture<Void> future) {
-                                    try {
-                                        if (node.isLocal())
-                                            h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
-                                        else
-                                            ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
-                                    }
-                                    catch (Exception e) {
-                                        U.error(log, e);
-                                    }
+                // Send the first page.
+                if (lockFut == null)
+                    sendNextPage(nodeRess, node, qryResults, qryIdx, segmentId, pageSize, removeMapping);
+                else {
+                    GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qryResults, qryIdx, segmentId, pageSize, removeMapping);
+
+                    if (msg != null) {
+                        lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
+                            @Override public void apply(IgniteInternalFuture<Void> future) {
+                                try {
+                                    if (node.isLocal())
+                                        h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+                                    else
+                                        ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
                                 }
-                            });
-                        }
+                                catch (Exception e) {
+                                    U.error(log, e);
+                                }
+                            }
+                        });
                     }
-
-                    qryIdx++;
                 }
 
-                // All request results are in the memory in result set already, so it's ok to release partitions.
-                if (!lazy)
-                    releaseReservations();
+                qryIdx++;
             }
-            catch (Throwable e){
+
+            // All request results are in the memory in result set already, so it's ok to release partitions.
+            if (!lazy)
                 releaseReservations();
+            else if (!qryResults.isAllClosed()) {
+                if (MapQueryLazyWorker.currentWorker() == null) {
+                    final ObjectPoolReusable<H2ConnectionWrapper> detachedConn = h2.detachConnection();
 
-                throw e;
+                    worker.start(H2Utils.session(conn), detachedConn);
+
+                    GridH2QueryContext.clearThreadLocal();
+                }
             }
+            else
+                unregisterLazyWorker(worker);
         }
         catch (Throwable e) {
-            if (qr != null) {
-                nodeRess.remove(reqId, segmentId, qr);
+            if (qryResults != null) {
+                nodeRess.remove(reqId, segmentId, qryResults);
 
-                qr.cancel(false);
+                qryResults.close();
             }
+            else
+                releaseReservations();
 
-            // Unregister worker after possible cancellation.
+            // Stop and unregister worker after possible cancellation.
             if (lazy)
-                stopAndUnregisterCurrentLazyWorker();
+                worker.stop(false);
 
-            GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
+            if (e instanceof QueryCancelledException)
+                sendError(node, reqId, e);
+            else {
+                JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
 
-            if (retryErr != null) {
-                final String retryCause = String.format(
-                    "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
-                    "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
-                );
+                if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+                    sendError(node, reqId, new QueryCancelledException());
+                else {
+                    GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
 
-                sendRetry(node, reqId, segmentId, retryCause);
-            }
-            else {
-                U.error(log, "Failed to execute local query.", e);
+                    if (retryErr != null) {
+                        final String retryCause = String.format(
+                            "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
+                                "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
+                        );
 
-                sendError(node, reqId, e);
+                        sendRetry(node, reqId, segmentId, retryCause);
+                    }
+                    else {
+                        U.error(log, "Failed to execute local query.", e);
+
+                        sendError(node, reqId, e);
 
-                if (e instanceof Error)
-                    throw (Error)e;
+                        if (e instanceof Error)
+                            throw (Error)e;
+                    }
+                }
             }
         }
         finally {
@@ -1060,10 +1033,25 @@ public class GridMapQueryExecutor {
                 for (int i = 0; i < reserved.size(); i++)
                     reserved.get(i).release();
             }
+
+            if (MapQueryLazyWorker.currentWorker() == null && GridH2QueryContext.get() != null)
+                GridH2QueryContext.clearThreadLocal();
         }
     }
 
     /**
+     * @param node The node has sent map query request.
+     * @param reqId Request ID.
+     * @param segmentId Segment ID.
+     * @return Lazy worker.
+     */
+    private MapQueryLazyWorker createLazyWorker(ClusterNode node, long reqId, int segmentId) {
+        MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
+
+        return  new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
+    }
+
+    /**
      * @param cacheIds Cache ids.
      * @return Id of the first cache in list, or {@code null} if list is empty.
      */
@@ -1088,6 +1076,7 @@ public class GridMapQueryExecutor {
     /**
      * @param node Node.
      * @param req DML request.
+     * @throws IgniteCheckedException On error.
      */
     private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) throws IgniteCheckedException {
         int[] parts = req.queryPartitions();
@@ -1255,24 +1244,34 @@ public class GridMapQueryExecutor {
             return;
         }
 
-        final MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
+        final MapQueryResults qryResults = nodeRess.get(req.queryRequestId(), req.segmentId());
 
-        if (qr == null)
+        if (qryResults == null)
             sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
-        else if (qr.cancelled())
+        else if (qryResults.cancelled())
             sendError(node, req.queryRequestId(), new QueryCancelledException());
         else {
-            MapQueryLazyWorker lazyWorker = qr.lazyWorker();
+            MapQueryLazyWorker lazyWorker = qryResults.lazyWorker();
 
             if (lazyWorker != null) {
                 lazyWorker.submit(new Runnable() {
                     @Override public void run() {
-                        sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false);
+                        try {
+                            sendNextPage(nodeRess, node, qryResults, req.query(), req.segmentId(), req.pageSize(), false);
+                        }
+                        catch (Throwable e) {
+                            JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
+
+                            if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+                                sendError(node, qryResults.queryRequestId(), new QueryCancelledException());
+                            else
+                                throw e;
+                        }
                     }
                 });
             }
             else
-                sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false);
+                sendNextPage(nodeRess, node, qryResults, req.query(), req.segmentId(), req.pageSize(), false);
         }
     }
 
@@ -1287,8 +1286,14 @@ public class GridMapQueryExecutor {
      * @return Next page.
      * @throws IgniteCheckedException If failed.
      */
-    private GridQueryNextPageResponse prepareNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
-        int pageSize, boolean removeMapping) throws IgniteCheckedException {
+    private GridQueryNextPageResponse prepareNextPage(
+        MapNodeResults nodeRess,
+        ClusterNode node,
+        MapQueryResults qr,
+        int qry,
+        int segmentId,
+        int pageSize,
+        boolean removeMapping) throws IgniteCheckedException {
         MapQueryResult res = qr.result(qry);
 
         assert res != null;
@@ -1309,8 +1314,11 @@ public class GridMapQueryExecutor {
                 nodeRess.remove(qr.queryRequestId(), segmentId, qr);
 
                 // Release reservations if the last page fetched, all requests are closed and this is a lazy worker.
-                if (MapQueryLazyWorker.currentWorker() != null)
+                if (qr.lazyWorker() != null) {
                     releaseReservations();
+
+                    qr.lazyWorker().stop(false);
+                }
             }
         }
 
@@ -1342,8 +1350,14 @@ public class GridMapQueryExecutor {
      * @param removeMapping Remove mapping flag.
      */
     @SuppressWarnings("unchecked")
-    private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
-        int pageSize, boolean removeMapping) {
+    private void sendNextPage(
+        MapNodeResults nodeRess,
+        ClusterNode node,
+        MapQueryResults qr,
+        int qry,
+        int segmentId,
+        int pageSize,
+        boolean removeMapping) {
         try {
             GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, removeMapping);
 
@@ -1365,6 +1379,7 @@ public class GridMapQueryExecutor {
      * @param node Node.
      * @param reqId Request ID.
      * @param segmentId Index segment ID.
+     * @param retryCause Description of the retry cause.
      */
     private void sendRetry(ClusterNode node, long reqId, int segmentId, String retryCause) {
         try {
@@ -1401,25 +1416,11 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread).
-     */
-    public void stopAndUnregisterCurrentLazyWorker() {
-        MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
-
-        if (worker != null) {
-            worker.stop(false);
-
-            // Just stop is not enough as worker may be registered, but not started due to exception.
-            unregisterLazyWorker(worker);
-        }
-    }
-
-    /**
      * Unregister lazy worker.
      *
      * @param worker Worker.
      */
-    public void unregisterLazyWorker(MapQueryLazyWorker worker) {
+    void unregisterLazyWorker(MapQueryLazyWorker worker) {
         lazyWorkers.remove(worker.key(), worker);
     }
 
@@ -1429,4 +1430,17 @@ public class GridMapQueryExecutor {
     public int registeredLazyWorkers() {
         return lazyWorkers.size();
     }
+
+    /**
+     * @param worker Worker to register.
+     */
+    void registerLazyWorker(MapQueryLazyWorker worker) {
+        MapQueryLazyWorker oldWorker = lazyWorkers.put(worker.key(), worker);
+
+        if (oldWorker != null) {
+            log.warning("Duplicates lazy worker: [key=" + worker.key() + ']');
+
+            oldWorker.stop(false);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index 0cb986b..217cfad 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.RandomAccess;
 import java.util.UUID;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -72,9 +73,11 @@ public class GridResultPage {
             Collection<?> plainRows = res.plainRows();
 
             if (plainRows != null) {
+                assert plainRows instanceof RandomAccess : "instance of " + plainRows.getClass();
+
                 rowsInPage = plainRows.size();
 
-                if (rowsInPage == 0 || ((ArrayList<Value[]>)plainRows).get(0).length == res.columns())
+                if (rowsInPage == 0 || ((List<Value[]>)plainRows).get(0).length == res.columns())
                     rows = (Iterator<Value[]>)plainRows.iterator();
                 else {
                     // If it's a result of SELECT FOR UPDATE (we can tell by difference in number

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
index 48116d3..8f8553a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -17,12 +17,11 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
-import java.util.concurrent.ConcurrentHashMap;
-
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
 
@@ -86,10 +85,10 @@ class MapNodeResults {
     public void cancelRequest(long reqId) {
         for (MapRequestKey key : res.keySet()) {
             if (key.requestId() == reqId) {
-                MapQueryResults removed = res.remove(key);
+                final MapQueryResults removed = res.remove(key);
 
                 if (removed != null)
-                    removed.cancel(true);
+                    removed.cancel();
             }
         }
 
@@ -144,7 +143,7 @@ class MapNodeResults {
      */
     public void cancelAll() {
         for (MapQueryResults ress : res.values())
-            ress.cancel(true);
+            ress.cancel();
 
         // Cancel update requests
         for (GridQueryCancel upd: updCancels.values())

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
index 98f3df9..1cbab19 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
@@ -20,25 +20,41 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+import org.h2.engine.Session;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+
 /**
  * Worker for lazy query execution.
  */
 public class MapQueryLazyWorker extends GridWorker {
+    /** Poll task timeout milliseconds. */
+    private static final int POLL_TASK_TIMEOUT_MS = 1000;
+
     /** Lazy thread flag. */
     private static final ThreadLocal<MapQueryLazyWorker> LAZY_WORKER = new ThreadLocal<>();
 
     /** Active lazy worker count (for testing purposes). */
     private static final LongAdder ACTIVE_CNT = new LongAdder();
 
+    /** Mutex to synchronization worker start/stop. */
+    private final Object mux = new Object();
+
     /** Task to be executed. */
     private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();
 
@@ -51,8 +67,14 @@ public class MapQueryLazyWorker extends GridWorker {
     /** Latch decremented when worker finishes. */
     private final CountDownLatch stopLatch = new CountDownLatch(1);
 
-    /** Map query result. */
-    private volatile MapQueryResult res;
+    /** Query context. */
+    private GridH2QueryContext qctx;
+
+    /** Worker is started flag. */
+    private boolean started;
+
+    /** Detached connection. */
+    private ObjectPoolReusable<H2ConnectionWrapper> detached;
 
     /**
      * Constructor.
@@ -70,38 +92,106 @@ public class MapQueryLazyWorker extends GridWorker {
         this.exec = exec;
     }
 
+    /**
+     * Start lazy worker for half-processed query.
+     * In this case we have to detach H2 connection from current thread and use it for current query processing.
+     * Also tables locks must be transferred to lazy thread from QUERY_POOL thread pool.
+     *
+     * @param ses H2 Session.
+     * @param detached H2 connection detached from current thread.
+     * @throws QueryCancelledException  In case query is canceled during the worker start.
+     */
+    void start(Session ses, ObjectPoolReusable<H2ConnectionWrapper> detached) throws QueryCancelledException {
+        synchronized (mux) {
+            if (!exec.busyLock().enterBusy()) {
+                log.warning("Lazy worker isn't started. Node is stopped [key=" + key + ']');
+
+                return;
+            }
+
+            try {
+                if (started)
+                    return;
+
+                if (isCancelled) {
+                    if (detached != null)
+                        detached.recycle();
+
+                    throw new QueryCancelledException();
+                }
+
+                if (ses != null)
+                    lazyTransferStart(ses);
+
+                this.detached = detached;
+
+                exec.registerLazyWorker(this);
+
+                IgniteThread thread = new IgniteThread(this);
+
+                started = true;
+
+                thread.start();
+            }
+            finally {
+                exec.busyLock().leaveBusy();
+            }
+        }
+    }
+
     /** {@inheritDoc} */
     @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
         LAZY_WORKER.set(this);
 
         ACTIVE_CNT.increment();
 
+        boolean lockBusy = false;
+
         try {
+            if (qctx != null)
+                GridH2QueryContext.set(qctx);
+
+            if(detached != null)
+                lazyTransferFinish(H2Utils.session(detached.object().connection()));
+
             while (!isCancelled()) {
-                Runnable task = tasks.take();
+                Runnable task = tasks.poll(POLL_TASK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
 
                 if (task != null) {
-                    if (!exec.busyLock().enterBusy())
-                        return;
-
                     try {
                         task.run();
                     }
+                    catch (Throwable t) {
+                        log.warning("Lazy task error", t);
+                    }
+                }
+                else {
+                    try {
+                        lockBusy = false;
+
+                        if (!exec.busyLock().enterBusy()) {
+                            log.info("Stop lazy worker [key=" + key + ']');
+
+                            return;
+                        }
+
+                        lockBusy = true;
+                    }
                     finally {
-                        exec.busyLock().leaveBusy();
+                        if (lockBusy)
+                            exec.busyLock().leaveBusy();
                     }
                 }
             }
         }
         finally {
-            if (res != null)
-                res.close();
+            exec.unregisterLazyWorker(this);
 
             LAZY_WORKER.set(null);
 
             ACTIVE_CNT.decrement();
 
-            exec.unregisterLazyWorker(this);
+            stopLatch.countDown();
         }
     }
 
@@ -111,6 +201,9 @@ public class MapQueryLazyWorker extends GridWorker {
      * @param task Task to be executed.
      */
     public void submit(Runnable task) {
+        if (isCancelled)
+            return;
+
         tasks.add(task);
     }
 
@@ -125,45 +218,76 @@ public class MapQueryLazyWorker extends GridWorker {
      * Stop the worker.
      * @param nodeStop Node is stopping.
      */
-    public void stop(final boolean nodeStop) {
-        if (MapQueryLazyWorker.currentWorker() == null)
-            submit(new Runnable() {
-                @Override public void run() {
-                    stop(nodeStop);
-                }
-            });
-        else {
-            GridH2QueryContext qctx = GridH2QueryContext.get();
-
-            if (qctx != null) {
+    private void stop0(boolean nodeStop) {
+        synchronized (mux) {
+            if (qctx != null && qctx.distributedJoinMode() == OFF && !qctx.isCleared())
                 qctx.clearContext(nodeStop);
 
-                GridH2QueryContext.clearThreadLocal();
+            if (detached != null) {
+                detached.recycle();
+
+                detached = null;
             }
 
             isCancelled = true;
 
-            stopLatch.countDown();
+            mux.notifyAll();
         }
     }
 
     /**
-     * Await worker stop.
+     * @param task Stop task.
      */
-    public void awaitStop() {
-        try {
-            U.await(stopLatch);
+    public void submitStopTask(Runnable task) {
+        synchronized (mux) {
+            if (LAZY_WORKER.get() != null)
+                task.run();
+            else
+                submit(task);
         }
-        catch (IgniteInterruptedCheckedException e) {
-            throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e);
+    }
+
+    /**
+     * Stop the worker.
+     * @param nodeStop Node is stopping.
+     */
+    public void stop(final boolean nodeStop) {
+        synchronized (mux) {
+            if (isCancelled)
+                return;
+
+            if (started && currentWorker() == null) {
+                submit(new Runnable() {
+                    @Override public void run() {
+                        stop0(nodeStop);
+                    }
+                });
+
+                awaitStop();
+            }
+            else if (currentWorker() != null)
+                stop0(nodeStop);
         }
     }
 
     /**
-     * @param res Map query result.
+     * Await worker stop.
      */
-    public void result(MapQueryResult res) {
-        this.res = res;
+    private void awaitStop() {
+        synchronized (mux) {
+            try {
+                if (!isCancelled)
+                    mux.wait();
+
+                U.await(stopLatch);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e);
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
     }
 
     /**
@@ -181,6 +305,13 @@ public class MapQueryLazyWorker extends GridWorker {
     }
 
     /**
+     * @param qctx Query context.
+     */
+    public void queryContext(GridH2QueryContext qctx) {
+        this.qctx = qctx;
+    }
+
+    /**
      * Construct worker name.
      *
      * @param instanceName Instance name.
@@ -191,4 +322,32 @@ public class MapQueryLazyWorker extends GridWorker {
         return "query-lazy-worker_" + instanceName + "_" + key.nodeId() + "_" + key.queryRequestId() + "_" +
             key.segment();
     }
+
+    /**
+     * Start session transfer to lazy thread.
+     *
+     * @param ses Session.
+     */
+    private static void lazyTransferStart(Session ses) {
+        GridH2QueryContext qctx = GridH2QueryContext.get();
+
+        assert qctx != null;
+
+        for(GridH2Table tbl : qctx.lockedTables())
+            tbl.onLazyTransferStarted(ses);
+    }
+
+    /**
+     * Finish session transfer to lazy thread.
+     *
+     * @param ses Session.
+     */
+    private static void lazyTransferFinish(Session ses) {
+        GridH2QueryContext qctx = GridH2QueryContext.get();
+
+        assert qctx != null;
+
+        for(GridH2Table tbl : qctx.lockedTables())
+            tbl.onLazyTransferFinished(ses);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index fb928c4..5a0c410 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -22,6 +22,7 @@ import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -60,6 +61,9 @@ class MapQueryResult {
         }
     }
 
+    /** Logger. */
+    private final IgniteLogger log;
+
     /** Indexing. */
     private final IgniteH2Indexing h2;
 
@@ -96,26 +100,23 @@ class MapQueryResult {
     /** */
     private final Object[] params;
 
-    /** Lazy worker. */
-    private final MapQueryLazyWorker lazyWorker;
-
     /**
+     * @param h2 H2 indexing.
      * @param rs Result set.
      * @param cctx Cache context.
      * @param qrySrcNodeId Query source node.
      * @param qry Query.
      * @param params Query params.
-     * @param lazyWorker Lazy worker.
      */
     MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable GridCacheContext cctx,
-        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) {
+        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
+        this.log = h2.kernalContext().log(MapQueryResult.class);
         this.h2 = h2;
         this.cctx = cctx;
         this.qry = qry;
         this.params = params;
         this.qrySrcNodeId = qrySrcNodeId;
         this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
-        this.lazyWorker = lazyWorker;
 
         if (rs != null) {
             this.rs = rs;
@@ -174,8 +175,6 @@ class MapQueryResult {
      * @return {@code true} If there are no more rows available.
      */
     synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
-        assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
-
         if (closed)
             return true;
 
@@ -259,30 +258,13 @@ class MapQueryResult {
      * Close the result.
      */
     public void close() {
-        if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) {
-            lazyWorker.submit(new Runnable() {
-                @Override public void run() {
-                    close();
-                }
-            });
-
-            lazyWorker.awaitStop();
-
-            return;
-        }
-
         synchronized (this) {
-            assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
-
             if (closed)
                 return;
 
             closed = true;
 
-            U.closeQuiet(rs);
-
-            if (lazyWorker != null)
-                lazyWorker.stop(false);
+            U.close(rs, log);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index 76527bc..b13137c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -30,7 +30,7 @@ import org.jetbrains.annotations.Nullable;
  * Mapper query results.
  */
 class MapQueryResults {
-    /** H@ indexing. */
+    /** H2 indexing. */
     private final IgniteH2Indexing h2;
 
     /** */
@@ -113,10 +113,7 @@ class MapQueryResults {
      * @param params Query arguments.
      */
     void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
-        MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params, lazyWorker);
-
-        if (lazyWorker != null)
-            lazyWorker.result(res);
+        MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params);
 
         if (!results.compareAndSet(qry, null, res))
             throw new IllegalStateException();
@@ -139,28 +136,37 @@ class MapQueryResults {
     /**
      * Cancels the query.
      */
-    void cancel(boolean forceQryCancel) {
+    void cancel() {
         if (cancelled)
             return;
 
         cancelled = true;
 
         for (int i = 0; i < results.length(); i++) {
-            MapQueryResult res = results.get(i);
+            GridQueryCancel cancel = cancels[i];
 
-            if (res != null) {
-                res.close();
+            if (cancel != null)
+                cancel.cancel();
+        }
 
-                continue;
-            }
+        if (lazyWorker == null)
+            close();
+        else {
+            lazyWorker.submitStopTask(this::close);
 
-            // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable).
-            if (forceQryCancel) {
-                GridQueryCancel cancel = cancels[i];
+            lazyWorker.stop(false);
+        }
+    }
 
-                if (cancel != null)
-                    cancel.cancel();
-            }
+    /**
+     *
+     */
+    public void close() {
+        for (int i = 0; i < results.length(); i++) {
+            MapQueryResult res = results.get(i);
+
+            if (res != null)
+                res.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
index a112969..a991530 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java
@@ -96,7 +96,6 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT
         int partsFilled = fillAllPartitions(cache, aff);
 
         SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person")
-            .setLazy(true)
             .setPageSize(1);
 
         FieldsQueryCursor<List<?>> qryCursor = cache.query(qry);
@@ -143,7 +142,6 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT
         int partsFilled = fillAllPartitions(cache, aff);
 
         SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person")
-            .setLazy(true)
             .setPageSize(1);
 
         FieldsQueryCursor<List<?>> qryCursor = cache.query(qry);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
index 59be138..24e2fb2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
@@ -121,12 +121,15 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
     private static int getStatementCacheSize(GridQueryProcessor qryProcessor) {
         IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProcessor, GridQueryProcessor.class, "idx");
 
-        ConcurrentMap<Thread, H2ConnectionWrapper> conns = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns");
+        ConcurrentMap<Thread, ConcurrentMap<H2ConnectionWrapper, Boolean>> conns =
+            GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns");
 
         int cntr = 0;
 
-        for (H2ConnectionWrapper w : conns.values())
-            cntr += w.statementCacheSize();
+        for (ConcurrentMap<H2ConnectionWrapper, Boolean> connPerThread: conns.values()) {
+            for (H2ConnectionWrapper w : connPerThread.keySet())
+                cntr += w.statementCacheSize();
+        }
 
         return cntr;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
index 56fd7b8..8542f43 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
@@ -177,7 +177,6 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
     private void testQueryCancel(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit,
                                  boolean timeout) throws Exception {
         try (Ignite client = startGrid("client")) {
-
             IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME);
 
             assertEquals(0, cache.localSize());
@@ -204,7 +203,8 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
                 qry.setTimeout(timeoutUnits, timeUnit);
 
                 cursor = cache.query(qry);
-            } else {
+            }
+            else {
                 cursor = cache.query(qry);
 
                 client.scheduler().runLocal(new Runnable() {
@@ -214,7 +214,7 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
                 }, timeoutUnits, timeUnit);
             }
 
-            try(QueryCursor<List<?>> ignored = cursor) {
+            try (QueryCursor<List<?>> ignored = cursor) {
                 cursor.iterator();
             }
             catch (CacheException ex) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
index bad5303..3beebff 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
+import org.apache.ignite.testframework.GridTestUtils;
 
 /**
  * Test for distributed queries with node restarts.
@@ -101,11 +102,11 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
 
         assertEquals(broadcastQry, plan.contains("batched:broadcast"));
 
-        final List<List<?>> pRes = grid(0).cache("pu").query(qry0).getAll();
+        final List<List<?>> goldenRes = grid(0).cache("pu").query(qry0).getAll();
 
         Thread.sleep(3000);
 
-        assertEquals(pRes, grid(0).cache("pu").query(qry0).getAll());
+        assertEquals(goldenRes, grid(0).cache("pu").query(qry0).getAll());
 
         final SqlFieldsQuery qry1;
 
@@ -122,7 +123,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
 
         final List<List<?>> rRes = grid(0).cache("co").query(qry1).getAll();
 
-        assertFalse(pRes.isEmpty());
+        assertFalse(goldenRes.isEmpty());
         assertFalse(rRes.isEmpty());
 
         final AtomicInteger qryCnt = new AtomicInteger();
@@ -161,9 +162,12 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
                             qry.setPageSize(smallPageSize ? 30 : 1000);
 
                             try {
-                                assertEquals(pRes, cache.query(qry).getAll());
+                                assertEquals(goldenRes, cache.query(qry).getAll());
                             }
                             catch (CacheException e) {
+                                if (!smallPageSize)
+                                    log.error("Unexpected exception at the test", e);
+
                                 assertTrue("On large page size must retry.", smallPageSize);
 
                                 boolean failedOnRemoteFetch = false;
@@ -263,7 +267,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa
             }
         }, restartThreadsNum, "restart-thread");
 
-        Thread.sleep(duration);
+        GridTestUtils.waitForCondition(() -> fail.get(), duration);
 
         info("Stopping...");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
index 072f1ab..4d02b2e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
@@ -627,6 +627,8 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
      * @throws Exception If failed.
      */
     public void testQueryConsistencyMultithreaded() throws Exception {
+        final int KEY_COUNT = 5000;
+
         // Start complex topology.
         ignitionStart(serverConfiguration(1));
         ignitionStart(serverConfiguration(2));
@@ -638,7 +640,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
 
         run(cli, createSql);
 
-        put(cli, 0, 5000);
+        put(cli, 0, KEY_COUNT);
 
         final AtomicBoolean stopped = new AtomicBoolean();
 
@@ -696,7 +698,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo
                     List<Cache.Entry<BinaryObject, BinaryObject>> res = cache.query(
                         new SqlQuery<BinaryObject, BinaryObject>(valTypeName, "from " + TBL_NAME)).getAll();
 
-                    assertEquals(5000, res.size());
+                    assertEquals(KEY_COUNT, res.size());
                 }
 
                 return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
index 7713004..fe45ed6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
@@ -160,7 +160,7 @@ public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest {
                 Map<Thread, ?> conns = perThreadConnections(i);
 
                 for(Thread t : conns.keySet())
-                    log.error("+++ Connection is not closed for thread: " + t.getName());
+                    log.error("Connection is not closed for thread: " + t.getName());
             }
 
             fail("H2 JDBC connections leak detected. See the log above.");

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
index d5cc0eb..140eb6e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
@@ -17,13 +17,21 @@
 
 package org.apache.ignite.internal.processors.query;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
@@ -31,16 +39,11 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
 /**
  * Tests for lazy query execution.
  */
 public class LazyQuerySelfTest extends GridCommonAbstractTest {
-    /** Keys ocunt. */
+    /** Keys count. */
     private static final int KEY_CNT = 200;
 
     /** Base query argument. */
@@ -94,6 +97,91 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Test DDL operation on table with high load queries.
+     *
+     * @throws Exception If failed.
+     */
+    public void testTableWriteLockStarvation() throws Exception {
+        final Ignite srv = startGrid(1);
+
+        srv.createCache(cacheConfiguration(4));
+
+        populateBaseQueryData(srv);
+
+        final AtomicBoolean end = new AtomicBoolean(false);
+
+        final int qryThreads = 10;
+
+        final CountDownLatch latch = new CountDownLatch(qryThreads);
+
+        // Do many concurrent queries.
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                latch.countDown();
+
+                while(!end.get()) {
+                    FieldsQueryCursor<List<?>> cursor = execute(srv, query(0)
+                        .setPageSize(PAGE_SIZE_SMALL));
+
+                    cursor.getAll();
+                }
+            }
+        }, qryThreads, "usr-qry");
+
+        latch.await();
+
+        Thread.sleep(500);
+
+        execute(srv, new SqlFieldsQuery("CREATE INDEX PERSON_NAME ON Person (name asc)")).getAll();
+        execute(srv, new SqlFieldsQuery("DROP INDEX PERSON_NAME")).getAll();
+
+        // Test is OK in case DDL operations is passed on hi load queries pressure.
+        end.set(true);
+        fut.get();
+    }
+
+    /**
+     * Test release reserved partition after query complete (results is bigger than one page).
+     *
+     * @throws Exception If failed.
+     */
+    public void testReleasePartitionReservationSeveralPagesResults() throws Exception {
+        checkReleasePartitionReservation(PAGE_SIZE_SMALL);
+    }
+
+    /**
+     * Test release reserved partition after query complete (results is placed on one page).
+     *
+     * @throws Exception If failed.
+     */
+    public void testReleasePartitionReservationOnePageResults() throws Exception {
+        checkReleasePartitionReservation(KEY_CNT);
+    }
+
+    /**
+     * Test release reserved partition after query complete.
+     *
+     * @param pageSize Results page size.
+     * @throws Exception If failed.
+     */
+    public void checkReleasePartitionReservation(int pageSize) throws Exception {
+        Ignite srv1 = startGrid(1);
+        startGrid(2);
+
+        srv1.createCache(cacheConfiguration(1));
+
+        populateBaseQueryData(srv1);
+
+        FieldsQueryCursor<List<?>> cursor = execute(srv1, query(0).setPageSize(pageSize));
+
+        cursor.getAll();
+
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
      * Check local query execution.
      *
      * @param parallelism Query parallelism.
@@ -151,18 +239,18 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
         assertNoWorkers();
 
         // Test server node leave with active worker.
-        cursor = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+        FieldsQueryCursor<List<?>> cursor2 = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
 
         try {
-            iter = cursor.iterator();
+            Iterator<List<?>> iter2 = cursor2.iterator();
 
             for (int i = 0; i < 30; i++)
-                iter.next();
+                iter2.next();
 
             stopGrid(2);
         }
         finally {
-            cursor.close();
+            cursor2.close();
         }
 
         assertNoWorkers();
@@ -233,7 +321,55 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
             }
         }
 
+        checkHoldLazyQuery(node);
+
+        checkShortLazyQuery(node);
+    }
+
+    /**
+     * @param node Ignite node.
+     * @throws Exception If failed.
+     */
+    public void checkHoldLazyQuery(Ignite node) throws Exception {
+        ArrayList rows = new ArrayList<>();
+
+        FieldsQueryCursor<List<?>> cursor0 = execute(node, query(BASE_QRY_ARG).setPageSize(PAGE_SIZE_SMALL));
+
+        // Do many concurrent queries to Test full iteration.
+        GridTestUtils.runMultiThreaded(new Runnable() {
+            @Override public void run() {
+                for (int i = 0; i < 5; ++i) {
+                    FieldsQueryCursor<List<?>> cursor = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1)
+                        .setPageSize(PAGE_SIZE_SMALL));
+
+                    cursor.getAll();
+                }
+            }
+        }, 5, "usr-qry");
+
+        for (List<?> row : cursor0)
+            rows.add(row);
+
+        assertBaseQueryResults(rows);
+    }
+
+    /**
+     * @param node Ignite node.
+     * @throws Exception If failed.
+     */
+    public void checkShortLazyQuery(Ignite node) throws Exception {
+        ArrayList rows = new ArrayList<>();
+
+        FieldsQueryCursor<List<?>> cursor0 = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1).setPageSize(PAGE_SIZE_SMALL));
+
+        Iterator<List<?>> it = cursor0.iterator();
+
         assertNoWorkers();
+
+        while (it.hasNext())
+            rows.add(it.next());
+
+        assertQueryResults(rows, KEY_CNT - PAGE_SIZE_SMALL + 1);
     }
 
     /**
@@ -267,8 +403,11 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
      * @return Default cache configuration.
      */
     private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism) {
-        return new CacheConfiguration<Long, Person>().setName(CACHE_NAME).setIndexedTypes(Long.class, Person.class)
-            .setQueryParallelism(parallelism);
+        return new CacheConfiguration<Long, Person>()
+            .setName(CACHE_NAME)
+            .setIndexedTypes(Long.class, Person.class)
+            .setQueryParallelism(parallelism)
+            .setAffinity(new RendezvousAffinityFunction(false, 10));
     }
 
     /**
@@ -278,7 +417,7 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
      * @return Query.
      */
     private static SqlFieldsQuery query(long arg) {
-        return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= ?").setArgs(arg);
+        return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= " + arg);
     }
 
     /**
@@ -287,13 +426,23 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
      * @param rows Result rows.
      */
     private static void assertBaseQueryResults(List<List<?>> rows) {
-        assertEquals(KEY_CNT - BASE_QRY_ARG, rows.size());
+        assertQueryResults(rows, BASE_QRY_ARG);
+    }
+
+    /**
+     * Assert base query results.
+     *
+     * @param rows Result rows.
+     * @param resSize Result size.
+     */
+    private static void assertQueryResults(List<List<?>> rows, int resSize) {
+        assertEquals(KEY_CNT - resSize, rows.size());
 
         for (List<?> row : rows) {
             Long id = (Long)row.get(0);
             String name = (String)row.get(1);
 
-            assertTrue(id >= BASE_QRY_ARG);
+            assertTrue(id >= resSize);
             assertEquals(nameForId(id), name);
         }
     }
@@ -317,7 +466,7 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
      */
     @SuppressWarnings("unchecked")
     private static FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) {
-        return cache(node).query(qry.setLazy(true));
+        return cache(node).query(qry);
     }
 
     /**
@@ -325,8 +474,8 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
      *
      * @throws Exception If failed.
      */
-    private static void assertNoWorkers() throws Exception {
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+    private void assertNoWorkers() throws Exception {
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 for (Ignite node : Ignition.allGrids()) {
                     IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing();
@@ -337,7 +486,22 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest {
 
                 return MapQueryLazyWorker.activeCount() == 0;
             }
-        }, 1000L);
+        }, 1000L)) {
+            log.error("Lazy workers on nodes:");
+
+            for (Ignite node : Ignition.allGrids()) {
+                IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing();
+
+                if (idx.mapQueryExecutor().registeredLazyWorkers() != 0) {
+                    log.error("[node=" + node + ", " + "registeredLazyWorkers="
+                        + idx.mapQueryExecutor().registeredLazyWorkers() + ']');
+                }
+
+                log.error("Active lazy workers: " + MapQueryLazyWorker.activeCount());
+
+                fail("There are not stopped lazy workers. See error message above.");
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java
new file mode 100644
index 0000000..9be0870
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ObjectPoolSelfTest extends GridCommonAbstractTest {
+    /** */
+    private ObjectPool<Obj> pool = new ObjectPool<>(Obj::new, 1, null, null);
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testObjectIsReusedAfterRecycling() throws Exception {
+        ObjectPoolReusable<Obj> r1 = pool.borrow();
+
+        Obj o1 = r1.object();
+
+        r1.recycle();
+
+        ObjectPoolReusable<Obj> r2 = pool.borrow();
+
+        Obj o2 = r2.object();
+
+        assertSame(o1, o2);
+
+        assertFalse(o1.isClosed());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBorrowedObjectIsNotReturnedTwice() throws Exception {
+        ObjectPoolReusable<Obj> r1 = pool.borrow();
+        ObjectPoolReusable<Obj> r2 = pool.borrow();
+
+        assertNotSame(r1.object(), r2.object());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testObjectShouldBeClosedOnRecycleIfPoolIsFull() throws Exception {
+        ObjectPoolReusable<Obj> r1 = pool.borrow();
+        ObjectPoolReusable<Obj> r2 = pool.borrow();
+
+        Obj o2 = r2.object();
+
+        r1.recycle();
+        r2.recycle();
+
+        assertNull(r1.object());
+        assertNull(r2.object());
+
+        assertTrue(o2.isClosed());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testObjectShouldNotBeReturnedIfPoolIsFull() throws Exception {
+        ObjectPoolReusable<Obj> r1 = pool.borrow();
+        ObjectPoolReusable<Obj> r2 = pool.borrow();
+
+        r1.recycle();
+
+        assertEquals(1, pool.bagSize());
+
+        r2.recycle();
+
+        assertEquals(1, pool.bagSize());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testObjectShouldReturnedToBag() throws Exception {
+        ObjectPoolReusable<Obj> r1 = pool.borrow();
+
+        CompletableFuture.runAsync(() -> {
+            r1.recycle();
+
+            assertEquals(1, pool.bagSize());
+        }).join();
+
+        assertEquals(1, pool.bagSize());
+    }
+
+    /** */
+    private static class Obj implements AutoCloseable {
+        /** */
+        private boolean closed = false;
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            closed = true;
+        }
+
+        /**
+         * @return {@code True} if closed.
+         */
+        public boolean isClosed() {
+            return closed;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java
deleted file mode 100644
index b7b7a37..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool.Reusable;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class ThreadLocalObjectPoolSelfTest extends GridCommonAbstractTest {
-    /** */
-    private ThreadLocalObjectPool<Obj> pool = new ThreadLocalObjectPool<>(Obj::new, 1);
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testObjectIsReusedAfterRecycling() throws Exception {
-        Reusable<Obj> o1 = pool.borrow();
-        o1.recycle();
-        Reusable<Obj> o2 = pool.borrow();
-
-        assertSame(o1.object(), o2.object());
-        assertFalse(o1.object().isClosed());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBorrowedObjectIsNotReturnedTwice() throws Exception {
-        Reusable<Obj> o1 = pool.borrow();
-        Reusable<Obj> o2 = pool.borrow();
-
-        assertNotSame(o1.object(), o2.object());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testObjectShouldBeClosedOnRecycleIfPoolIsFull() throws Exception {
-        Reusable<Obj> o1 = pool.borrow();
-        Reusable<Obj> o2 = pool.borrow();
-        o1.recycle();
-        o2.recycle();
-
-        assertTrue(o2.object().isClosed());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testObjectShouldNotBeReturnedIfPoolIsFull() throws Exception {
-        Reusable<Obj> o1 = pool.borrow();
-        Reusable<Obj> o2 = pool.borrow();
-
-        o1.recycle();
-
-        assertEquals(1, pool.bagSize());
-
-        o2.recycle();
-
-        assertEquals(1, pool.bagSize());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testObjectShouldReturnedToRecyclingThreadBag() throws Exception {
-        Reusable<Obj> o1 = pool.borrow();
-
-        CompletableFuture.runAsync(() -> {
-            o1.recycle();
-
-            assertEquals(1, pool.bagSize());
-        }).join();
-
-        assertEquals(0, pool.bagSize());
-    }
-
-    /** */
-    private static class Obj implements AutoCloseable {
-        /** */
-        private boolean closed = false;
-
-        /** {@inheritDoc} */
-        @Override public void close() {
-            closed = true;
-        }
-
-        /**
-         * @return {@code True} if closed.
-         */
-        public boolean isClosed() {
-            return closed;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f97ebff9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
index dbb2c59..ac467d5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
@@ -384,11 +384,6 @@ public class RetryCauseMessageSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void cancelLazyWorkers() {
-            startedExecutor.cancelLazyWorkers();
-        }
-
-        /** {@inheritDoc} */
         @Override GridSpinBusyLock busyLock() {
             return startedExecutor.busyLock();
         }
@@ -399,19 +394,8 @@ public class RetryCauseMessageSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void stopAndUnregisterCurrentLazyWorker() {
-            startedExecutor.stopAndUnregisterCurrentLazyWorker();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void unregisterLazyWorker(MapQueryLazyWorker worker) {
-            startedExecutor.unregisterLazyWorker(worker);
-        }
-
-        /** {@inheritDoc} */
         @Override public int registeredLazyWorkers() {
             return startedExecutor.registeredLazyWorkers();
         }
     }
-
 }