You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/04/13 08:55:30 UTC

[ignite] branch sql-calcite updated: IGNITE-14262 Calcite improvements. Fix close/cancel of the query execution (get rid of numerous ExecutionCancelledException)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 44df9ee  IGNITE-14262 Calcite improvements. Fix close/cancel of the query execution (get rid of numerous ExecutionCancelledException)
44df9ee is described below

commit 44df9ee220b24399a5fc36e1954cddbfecfc7d7b
Author: zstan <st...@gmail.com>
AuthorDate: Tue Apr 13 11:54:59 2021 +0300

    IGNITE-14262 Calcite improvements. Fix close/cancel of the query execution (get rid of numerous ExecutionCancelledException)
---
 .../query/calcite/exec/ExchangeServiceImpl.java    | 39 ++++++++-----
 .../query/calcite/exec/ExecutionContext.java       | 28 ++++++++++
 .../query/calcite/exec/ExecutionServiceImpl.java   | 13 +++--
 .../query/calcite/exec/MailboxRegistryImpl.java    | 14 +++++
 .../query/calcite/exec/QueryTaskExecutor.java      | 13 +++++
 .../query/calcite/exec/QueryTaskExecutorImpl.java  |  6 ++
 .../query/calcite/exec/rel/HashAggregateNode.java  | 14 ++---
 .../query/calcite/exec/rel/MergeJoinNode.java      |  3 +-
 .../query/calcite/exec/rel/RootNode.java           |  8 ++-
 .../query/calcite/exec/rel/ScanNode.java           | 17 +++---
 .../query/calcite/exec/rel/SortNode.java           |  3 +
 .../query/calcite/CalciteQueryProcessorTest.java   | 64 ++++++++++++++++++++--
 .../processors/query/calcite/CancelTest.java       | 43 +--------------
 .../processors/query/calcite/QueryChecker.java     | 41 ++++++++++++++
 .../thread/IgniteStripedThreadPoolExecutor.java    | 15 +++++
 15 files changed, 239 insertions(+), 82 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index c795694..02af657 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -17,10 +17,14 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import com.google.common.collect.ImmutableMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -157,36 +161,45 @@ public class ExchangeServiceImpl extends AbstractService implements ExchangeServ
     /** */
     protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
         Collection<Inbox<?>> inboxes = mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
+
         if (!F.isEmpty(inboxes)) {
-            for (Inbox<?> inbox : inboxes) {
-                inbox.context().cancel();
+            for (Inbox<?> inbox : inboxes)
                 inbox.context().execute(inbox::close, inbox::onError);
-            }
         }
         else if (log.isDebugEnabled()) {
             log.debug("Stale inbox cancel message received: [" +
-                "nodeId=" + nodeId + ", " +
-                "queryId=" + msg.queryId() + ", " +
-                "fragmentId=" + msg.fragmentId() + ", " +
-                "exchangeId=" + msg.exchangeId() + "]");
+                "nodeId=" + nodeId +
+                ", queryId=" + msg.queryId() +
+                ", fragmentId=" + msg.fragmentId() +
+                ", exchangeId=" + msg.exchangeId() + "]");
         }
     }
 
     /** */
     protected void onMessage(UUID nodeId, OutboxCloseMessage msg) {
         Collection<Outbox<?>> outboxes = mailboxRegistry().outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
+
         if (!F.isEmpty(outboxes)) {
+            List<CompletableFuture<?>> futs = new ArrayList<>(outboxes.size());
+
+            Set<ExecutionContext<?>> ctxs = new HashSet<>();
+
             for (Outbox<?> outbox : outboxes) {
-                outbox.context().cancel();
-                outbox.context().execute(outbox::close, outbox::onError);
+                CompletableFuture<?> fut = outbox.context().submit(outbox::close, outbox::onError);
+
+                futs.add(fut);
+
+                ctxs.add(outbox.context());
             }
+
+            CompletableFuture.allOf(futs.toArray(new CompletableFuture<?>[0])).thenRun(() -> ctxs.forEach(ExecutionContext::cancel));
         }
         else if (log.isDebugEnabled()) {
             log.debug("Stale oubox cancel message received: [" +
-                "nodeId=" + nodeId + ", " +
-                "queryId=" + msg.queryId() + ", " +
-                "fragmentId=" + msg.fragmentId() + ", " +
-                "exchangeId=" + msg.exchangeId() + "]");
+                "nodeId=" + nodeId +
+                ", queryId=" + msg.queryId() +
+                ", fragmentId=" + msg.fragmentId() +
+                ", exchangeId=" + msg.exchangeId() + "]");
         }
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 6d65beb..47d453b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
@@ -230,6 +231,9 @@ public class ExecutionContext<Row> implements DataContext {
      * @param task Query task.
      */
     public void execute(RunnableX task, Consumer<Throwable> onError) {
+        if (isCancelled())
+            return;
+
         executor.execute(qryId, fragmentId(), () -> {
             try {
                 task.run();
@@ -242,6 +246,29 @@ public class ExecutionContext<Row> implements DataContext {
         });
     }
 
+    /**
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task. The Future's {@code get} method will
+     * return {@code null} upon <em>successful</em> completion.
+     *
+     * @param task the task to submit.
+     * @return a {@link CompletableFuture} representing pending task
+     */
+    public CompletableFuture<?> submit(RunnableX task, Consumer<Throwable> onError) {
+        assert !isCancelled() : "Call submit after execution was cancelled.";
+
+        return executor.submit(qryId, fragmentId(), () -> {
+            try {
+                task.run();
+            }
+            catch (Throwable e) {
+                onError.accept(e);
+
+                throw new IgniteException("Unexpected exception", e);
+            }
+        });
+    }
+
     /** */
     @FunctionalInterface
     public interface RunnableX {
@@ -258,6 +285,7 @@ public class ExecutionContext<Row> implements DataContext {
         return !cancelFlag.get() && cancelFlag.compareAndSet(false, true);
     }
 
+    /** */
     public boolean isCancelled() {
         return cancelFlag.get();
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 354b926..fa97dbd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -951,7 +951,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
         private final Set<RemoteFragmentKey> waiting;
 
         /** */
-        private QueryState state;
+        private volatile QueryState state;
 
         /** */
         private QueryInfo(ExecutionContext<Row> ctx, MultiStepPlan plan, Node<Row> root) {
@@ -1001,10 +1001,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
                 if (state == QueryState.RUNNING)
                     state0 = state = QueryState.CLOSING;
 
-                // 1) Cancel local fragment
-                ctx.cancel();
-
-                // 2) close local fragment
+                // 1) close local fragment
                 root.closeInternal();
 
                 if (state == QueryState.CLOSING && waiting.isEmpty())
@@ -1012,12 +1009,13 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
             }
 
             if (state0 == QueryState.CLOSED) {
-                // 3) unregister runing query
+                // 2) unregister runing query
                 running.remove(ctx.queryId());
 
                 // 4) close remote fragments
                 IgniteException wrpEx = null;
 
+                // 3) close remote fragments
                 for (UUID nodeId : remotes) {
                     try {
                         exchangeService().closeOutbox(nodeId, ctx.queryId(), -1, -1);
@@ -1030,6 +1028,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
                     }
                 }
 
+                // 4) Cancel local fragment
+                ctx.cancel();
+
                 if (wrpEx != null)
                     throw wrpEx;
             }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
index 39ace60..cbf7d95 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
@@ -33,6 +33,8 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Mailbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -49,9 +51,11 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis
     private final Map<MailboxKey, Inbox<?>> remotes;
 
     /** */
+    @GridToStringExclude
     private final DiscoveryEventListener discoLsnr;
 
     /** */
+    @GridToStringExclude
     private GridEventStorageManager evtMgr;
 
     /**
@@ -164,6 +168,11 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis
         return filter;
     }
 
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MailboxRegistryImpl.class, this);
+    }
+
     /** */
     private static class MailboxKey {
         /** */
@@ -198,5 +207,10 @@ public class MailboxRegistryImpl extends AbstractService implements MailboxRegis
             res = 31 * res + (int) (exchangeId ^ (exchangeId >>> 32));
             return res;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MailboxKey.class, this);
+        }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
index 4d7861c..830a4d2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
 
@@ -33,4 +34,16 @@ public interface QueryTaskExecutor extends Service {
      * @param qryTask Query task.
      */
     void execute(UUID qryId, long fragmentId, Runnable qryTask);
+
+    /**
+     * Returns a new CompletableFuture that is asynchronously completed
+     * by a task running in the given executor after it runs the given
+     * action.
+     *
+     * @param qryId Id of the query this task created for.
+     * @param fragmentId Id of the particular fragment this task created for.
+     * @param qryTask The task to submit.
+     * @return the new CompletableFuture
+     */
+    CompletableFuture<?> submit(UUID qryId, long fragmentId, Runnable qryTask);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
index b0db389..9a31205 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
@@ -82,6 +83,11 @@ public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskE
     }
 
     /** {@inheritDoc} */
+    @Override public CompletableFuture<?> submit(UUID qryId, long fragmentId, Runnable qryTask) {
+        return stripedThreadPoolExecutor.submit(qryTask, hash(qryId, fragmentId));
+    }
+
+    /** {@inheritDoc} */
     @Override public void onStart(GridKernalContext ctx) {
         exceptionHandler(ctx.uncaughtExceptionHandler());
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
index 1f4728e..0911e4c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
@@ -111,7 +111,7 @@ public class HashAggregateNode<Row> extends AbstractNode<Row> implements SingleN
         if (waiting == 0)
             source().request(waiting = IN_BUFFER_SIZE);
         else if (!inLoop)
-            context().execute(this::doFlush, this::onError);
+            context().execute(this::flush, this::onError);
     }
 
     /** {@inheritDoc} */
@@ -158,14 +158,12 @@ public class HashAggregateNode<Row> extends AbstractNode<Row> implements SingleN
     }
 
     /** */
-    private void doFlush() throws Exception {
-        checkState();
+    private void flush() throws Exception {
+        if (isClosed())
+            return;
 
-        flush();
-    }
+        checkState();
 
-    /** */
-    private void flush() throws Exception {
         assert waiting == -1;
 
         int processed = 0;
@@ -189,7 +187,7 @@ public class HashAggregateNode<Row> extends AbstractNode<Row> implements SingleN
 
                 if (processed >= IN_BUFFER_SIZE && requested > 0) {
                     // allow others to do their job
-                    context().execute(this::doFlush, this::onError);
+                    context().execute(this::flush, this::onError);
 
                     return;
                 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
index e97968b85..3d291c7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
@@ -283,7 +283,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
         @Override protected void join() throws Exception {
             inLoop = true;
             try {
-                while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) {
+                while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty()
+                    || rightMaterialization != null)) {
                     checkState();
 
                     if (left == null)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index a99693b..8b8838a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -22,6 +22,7 @@ import java.util.Deque;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -121,7 +122,12 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
 
     /** {@inheritDoc} */
     @Override public void closeInternal() {
-        context().execute(() -> sources().forEach(U::closeQuiet), this::onError);
+        try {
+            context().submit(() -> sources().forEach(U::closeQuiet), this::onError).get();
+        }
+        catch (InterruptedException | ExecutionException e) {
+            U.warn(context().planningContext().logger(), "Execution is cancelled.", e);
+        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index c4805d6..c6ed6d5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -59,14 +59,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
         requested = rowsCnt;
 
         if (!inLoop)
-            context().execute(this::doPush, this::onError);
-    }
-
-    /** */
-    private void doPush() throws Exception {
-        checkState();
-
-        push();
+            context().execute(this::push, this::onError);
     }
 
     /** {@inheritDoc} */
@@ -96,6 +89,11 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
 
     /** */
     private void push() throws Exception {
+        if (isClosed())
+            return;
+
+        checkState();
+
         inLoop = true;
         try {
             if (it == null)
@@ -110,7 +108,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
 
                 if (++processed == IN_BUFFER_SIZE && requested > 0) {
                     // allow others to do their job
-                    context().execute(this::doPush, this::onError);
+                    context().execute(this::push, this::onError);
 
                     return;
                 }
@@ -125,6 +123,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row>
             it = null;
 
             requested = 0;
+
             downstream().end();
         }
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 9f7a31f..00f3fdb5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -109,6 +109,9 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
 
     /** */
     private void flush() throws Exception {
+        if (isClosed())
+            return;
+
         assert waiting == -1;
 
         int processed = 0;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index e5a94a9..bee1930 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -39,18 +39,27 @@ import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.awaitReservationsRelease;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
@@ -62,6 +71,22 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     /** */
     private static IgniteEx client;
 
+    /** Log listener. */
+    private ListeningTestLogger listeningLog = new ListeningTestLogger(log);
+
+    /** */
+    private static LogListener lsnr = LogListener.matches(s ->
+        s.contains("Execution is cancelled") ||
+        s.contains("NullPointer") ||
+        s.contains("AssertionError")).build();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        listeningLog.registerListener(lsnr);
+
+        return super.getConfiguration(igniteInstanceName).setGridLogger(listeningLog);
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         startGrids(5);
@@ -140,9 +165,11 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             .setCacheMode(CacheMode.REPLICATED)
         );
 
-        Map<Integer, RISK> mRisk = new HashMap<>(65000);
+        int numRiskRows = 65_000;
+
+        Map<Integer, RISK> mRisk = new HashMap<>(numRiskRows);
 
-        for (int i = 0; i < 65000; i++)
+        for (int i = 0; i < numRiskRows; i++)
             mRisk.put(i, new RISK(i));
 
         RISK.putAll(mRisk);
@@ -191,6 +218,33 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
                 assertEquals(40L, res.get(0).get(0));
             }
         }
+
+        assertFalse(lsnr.check());
+
+        listeningLog.clearListeners();
+
+        awaitReservationsRelease("RISK");
+        awaitReservationsRelease("TRADE");
+        awaitReservationsRelease("BATCH");
+    }
+
+    /**
+     * Checks all grids execution contexts are closed or registered (Out|In)boxes are present.
+     */
+    private void checkContextCancelled() throws IgniteInterruptedCheckedException {
+        for (Ignite instance : G.allGrids()) {
+            QueryEngine engineCli = Commons.lookupComponent(((IgniteEx)instance).context(), QueryEngine.class);
+
+            MailboxRegistryImpl mailReg = GridTestUtils.getFieldValue(engineCli, CalciteQueryProcessor.class, "mailboxRegistry");
+
+            Map<Object, Inbox<?>> remotes = GridTestUtils.getFieldValue(mailReg, MailboxRegistryImpl.class, "remotes");
+
+            Map<Object, Outbox<?>> locals = GridTestUtils.getFieldValue(mailReg, MailboxRegistryImpl.class, "locals");
+
+            waitForCondition(() -> remotes.isEmpty() || remotes.values().stream().allMatch(s -> s.context().isCancelled()), 5_000);
+
+            waitForCondition(() -> locals.isEmpty() || locals.values().stream().allMatch(s -> s.context().isCancelled()), 5_000);
+        }
     }
 
     /** */
@@ -1005,8 +1059,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     }
 
     /** */
-    private List<List<?>> sql(String sql) {
-        QueryEngine engineSrv = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+    private List<List<?>> sql(String sql) throws IgniteInterruptedCheckedException {
+        QueryEngine engineSrv = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
 
         assertTrue(client.configuration().isClientMode());
 
@@ -1022,6 +1076,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             allSrv = srvCursor.getAll();
 
             assertEquals(allSrv.size(), cliCursor.getAll().size());
+
+            checkContextCancelled();
         }
 
         return allSrv;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
index 48af61c..b20c3c3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.calcite;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
@@ -29,30 +28,24 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryEngine;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import static java.util.Collections.singletonList;
 import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG;
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.awaitReservationsRelease;
 
 /**
  * Cancel query test.
  */
 public class CancelTest extends GridCommonAbstractTest {
-    /** Partition release timeout. */
-    private static final long PART_RELEASE_TIMEOUT = 5000L;
-
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         startGrids(2);
@@ -121,6 +114,7 @@ public class CancelTest extends GridCommonAbstractTest {
     /**
      *
      */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-14289")
     @Test
     public void testNotOriginatorNodeStop() throws Exception {
         QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
@@ -244,35 +238,4 @@ public class CancelTest extends GridCommonAbstractTest {
 
         awaitPartitionMapExchange();
     }
-
-    /**
-     * @param cacheName Cache to check
-     * @throws IgniteInterruptedCheckedException
-     */
-    void awaitReservationsRelease(String cacheName) throws IgniteInterruptedCheckedException {
-        for (Ignite ign : G.allGrids())
-            awaitReservationsRelease((IgniteEx)ign, "TEST");
-    }
-
-    /**
-     * @param node Node to check reservation.
-     * @param cacheName Cache to check reservations.
-     */
-    void awaitReservationsRelease(IgniteEx node, String cacheName) throws IgniteInterruptedCheckedException {
-        GridDhtAtomicCache c = GridTestUtils.getFieldValue(node.cachex(cacheName), "delegate");
-
-        List<GridDhtLocalPartition> parts = c.topology().localPartitions();
-
-        GridTestUtils.waitForCondition(() -> {
-            for (GridDhtLocalPartition p : parts) {
-                if (p.reservations() > 0)
-                    return false;
-            }
-
-            return true;
-        }, PART_RELEASE_TIMEOUT);
-
-        for (GridDhtLocalPartition p : parts)
-            assertEquals("Partition is reserved: " + p, 0, p.reservations());
-    }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index c50b1ea..67a25cb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -28,10 +28,17 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.ignite.Ignite;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.query.QueryEngine;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.Matcher;
 import org.hamcrest.core.SubstringMatcher;
@@ -45,6 +52,9 @@ import static org.junit.Assert.fail;
  *  Query checker.
  */
 public abstract class QueryChecker {
+    /** Partition release timeout. */
+    private static final long PART_RELEASE_TIMEOUT = 5_000L;
+
     /**
      * Ignite table scan matcher.
      *
@@ -407,4 +417,35 @@ public abstract class QueryChecker {
             return 0;
         }
     }
+
+    /**
+     * @param cacheName Cache to check
+     * @throws IgniteInterruptedCheckedException
+     */
+    public static void awaitReservationsRelease(String cacheName) throws IgniteInterruptedCheckedException {
+        for (Ignite ign : G.allGrids())
+            awaitReservationsRelease((IgniteEx)ign, cacheName);
+    }
+
+    /**
+     * @param node Node to check reservation.
+     * @param cacheName Cache to check reservations.
+     */
+    public static void awaitReservationsRelease(IgniteEx node, String cacheName) throws IgniteInterruptedCheckedException {
+        GridDhtAtomicCache c = GridTestUtils.getFieldValue(node.cachex(cacheName), "delegate");
+
+        List<GridDhtLocalPartition> parts = c.topology().localPartitions();
+
+        GridTestUtils.waitForCondition(() -> {
+            for (GridDhtLocalPartition p : parts) {
+                if (p.reservations() > 0)
+                    return false;
+            }
+
+            return true;
+        }, PART_RELEASE_TIMEOUT);
+
+        for (GridDhtLocalPartition p : parts)
+            assertEquals("Partition is reserved: " + p, 0, p.reservations());
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 6c1c62f..114120b 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -90,6 +91,20 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
     }
 
     /**
+     * Submits a {@link Runnable} task for execution and returns a {@link CompletableFuture} representing that task.
+     * The command with the same {@code index} will be executed in the same thread.
+     *
+     * @param task The task to submit.
+     * @return a {@link Future} representing pending completion of the task.
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution.
+     * @throws NullPointerException if the task is {@code null}.
+     */
+    public CompletableFuture<?> submit(Runnable task, int idx) {
+        return CompletableFuture.runAsync(task, execs[threadId(idx)]);
+    }
+
+    /**
      * @param idx Index.
      * @return Stripped thread ID.
      */