You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/06/19 10:38:04 UTC

[ignite] branch master updated: IGNITE-11918: SQL: Extend test coverage for KILL QUERY. This closes #6611.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b6a918  IGNITE-11918: SQL: Extend test coverage for KILL QUERY. This closes #6611.
0b6a918 is described below

commit 0b6a918b2ab0881fe114cc1943e97948325d710c
Author: Pavel Kuznetsov <pa...@gmail.com>
AuthorDate: Wed Jun 19 13:37:15 2019 +0300

    IGNITE-11918: SQL: Extend test coverage for KILL QUERY. This closes #6611.
---
 modules/core/src/main/resources/ignite.properties  |   2 +-
 .../query/h2/twostep/GridReduceQueryExecutor.java  |   9 +-
 .../processors/query/KillQueryFromClientTest.java  |   4 +-
 .../query/KillQueryFromNeighbourTest.java          |   4 +-
 .../internal/processors/query/KillQueryTest.java   | 848 +++++++++++++++------
 modules/yarn/src/main/resources/ignite.properties  |   2 +-
 6 files changed, 611 insertions(+), 258 deletions(-)

diff --git a/modules/core/src/main/resources/ignite.properties b/modules/core/src/main/resources/ignite.properties
index 57dcd33..90d5787 100644
--- a/modules/core/src/main/resources/ignite.properties
+++ b/modules/core/src/main/resources/ignite.properties
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-ignite.version=2.7.0-SNAPSHOT
+ignite.version=2.8.0-SNAPSHOT
 ignite.build=0
 ignite.revision=DEV
 ignite.rel.date=01011970
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index fb4779e..5f3886c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -386,6 +386,13 @@ public class GridReduceQueryExecutor {
         ReduceQueryRun lastRun = null;
 
         for (int attempt = 0;; attempt++) {
+            try {
+                cancel.checkCancelled();
+            }
+            catch (QueryCancelledException cancelEx) {
+                throw new CacheException("Failed to run reduce query locally. " + cancelEx.getMessage(),  cancelEx);
+            }
+
             if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout)) {
                 UUID retryNodeId = lastRun.retryNodeId();
                 String retryCause = lastRun.retryCause();
@@ -724,7 +731,7 @@ public class GridReduceQueryExecutor {
                         cause = disconnectedErr;
                 }
 
-                throw new CacheException("Failed to run reduce query locally.", cause);
+                throw new CacheException("Failed to run reduce query locally. " + cause.getMessage(), cause);
             }
             finally {
                 if (detachedConn != null)
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromClientTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromClientTest.java
index 92fb3f9..9c4b9bc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromClientTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromClientTest.java
@@ -20,13 +20,13 @@ package org.apache.ignite.internal.processors.query;
 
 import org.apache.ignite.internal.IgniteEx;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 
 /**
  * Test KILL QUERY requested from client node.
  */
 
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
 public class KillQueryFromClientTest extends KillQueryTest {
     /** {@inheritDoc} */
     @Override protected IgniteEx getKillRequestNode()  {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromNeighbourTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromNeighbourTest.java
index 3b0a0ea..c4e0756 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromNeighbourTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryFromNeighbourTest.java
@@ -20,12 +20,12 @@ package org.apache.ignite.internal.processors.query;
 
 import org.apache.ignite.internal.IgniteEx;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 
 /**
  * Test KILL QUERY requested from neighbour server node.
  */
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
 public class KillQueryFromNeighbourTest extends KillQueryTest {
     /** {@inheritDoc} */
     @Override protected IgniteEx getKillRequestNode() {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
index fa6553e..a96dafc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
@@ -19,6 +19,7 @@
 package org.apache.ignite.internal.processors.query;
 
 import java.io.Serializable;
+import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -32,14 +33,23 @@ import java.util.UUID;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -47,12 +57,27 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
+import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
+import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
+import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapResult;
+import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapper;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
 import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -60,14 +85,17 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 
+import static java.util.Arrays.stream;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
@@ -76,8 +104,24 @@ import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
  * Test KILL QUERY requested from the same node where quere was executed.
  */
 @SuppressWarnings({"ThrowableNotThrown", "AssertWithSideEffects"})
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
+// We need to set this threshold bigger than partitions count to force partition pruning for the BETWEEN case.
+// see org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor.tryExtractBetween
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN, value = "21")
 public class KillQueryTest extends GridCommonAbstractTest {
+    /** Generates values for the {@link #asyncCancel} parameter. */
+    @Parameterized.Parameters(name = "asyncCancel = {0}")
+    public static Iterable<Object[]> valuesForAsync() {
+        return Arrays.asList(new Object[][] {
+            {true},
+            {false}
+        });
+    }
+
+    /** Whether current test execution shuould use async or non-async cancel mechanism. */
+    @Parameterized.Parameter
+    public boolean asyncCancel;
+
     /** IP finder. */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
@@ -98,6 +142,9 @@ public class KillQueryTest extends GridCommonAbstractTest {
     /** Timeout for checking async result. */
     public static final int CHECK_RESULT_TIMEOUT = 1_000;
 
+    /** Number of partitions in the test chache. Keep it small to have enough rows in each partitions. */
+    public static final int PARTS_CNT = 20;
+
     /** Connection. */
     private Connection conn;
 
@@ -116,8 +163,11 @@ public class KillQueryTest extends GridCommonAbstractTest {
     /** Table count. */
     private static AtomicInteger tblCnt = new AtomicInteger();
 
-    /** Barrier. */
-    private static volatile CyclicBarrier barrier;
+    /** Barrier. Needed to test unsupported cancelation. Doesn't block threads (parties=1) by default. */
+    private static volatile CyclicBarrier barrier = new CyclicBarrier(1);
+
+    /** Allows to block messages, issued FROM the client node. */
+    private static TestRecordingCommunicationSpi clientBlocker;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -125,6 +175,8 @@ public class KillQueryTest extends GridCommonAbstractTest {
 
         CacheConfiguration<?, ?> cache = GridAbstractTest.defaultCacheConfiguration();
 
+        cache.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
+
         cache.setCacheMode(PARTITIONED);
         cache.setBackups(1);
         cache.setWriteSynchronizationMode(FULL_SYNC);
@@ -133,16 +185,22 @@ public class KillQueryTest extends GridCommonAbstractTest {
 
         cfg.setCacheConfiguration(cache);
 
-        if (++cntr == NODES_COUNT)
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        if (++cntr == NODES_COUNT) {
             cfg.setClientMode(true);
 
-        cfg.setDiscoverySpi(new TcpDiscoverySpi() {
+            clientBlocker = commSpi;
+        }
 
+        cfg.setDiscoverySpi(new TcpDiscoverySpi() {
             @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
-                if (CustomMessageWrapper.class.isAssignableFrom(msg.getClass())) {
+                if (msg instanceof CustomMessageWrapper) {
                     DiscoveryCustomMessage delegate = ((CustomMessageWrapper)msg).delegate();
 
-                    if (DynamicCacheChangeBatch.class.isAssignableFrom(delegate.getClass())) {
+                    if (delegate instanceof DynamicCacheChangeBatch) {
                         try {
                             awaitTimeout();
                         }
@@ -151,7 +209,7 @@ public class KillQueryTest extends GridCommonAbstractTest {
                         }
 
                     }
-                    else if (SchemaProposeDiscoveryMessage.class.isAssignableFrom(delegate.getClass())) {
+                    else if (delegate instanceof SchemaProposeDiscoveryMessage) {
                         try {
                             awaitTimeout();
                         }
@@ -168,19 +226,91 @@ public class KillQueryTest extends GridCommonAbstractTest {
         return cfg;
     }
 
+    /**
+     * Creates and populates a new cache that is used in distributed join scenario: We have table of Persons with some
+     * autogenerated PK. Join filter should be based on Person.id column which is not collocated with the pk. Result
+     * size of such join (with eq condition) is {@link #MAX_ROWS} rows.
+     *
+     * @param cacheName name of the created cache.
+     * @param shift integer to avoid collocation, put different value for the different caches.
+     */
+    private void createJoinCache(String cacheName, int shift) {
+        CacheConfiguration<Long, Person> ccfg = GridAbstractTest.defaultCacheConfiguration();
+
+        ccfg.setName(cacheName);
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setSqlFunctionClasses(TestSQLFunctions.class);
+
+        ccfg.setQueryEntities(Collections.singleton(
+            new QueryEntity(Integer.class.getName(), Person.class.getName())
+                .setTableName("PERSON")
+                .setKeyFieldName("rec_id") // PK
+                .addQueryField("rec_id", Integer.class.getName(), null)
+                .addQueryField("id", Integer.class.getName(), null)
+                .addQueryField("lastName", String.class.getName(), null)
+                .setIndexes(Collections.singleton(new QueryIndex("id", true, "idx_" + cacheName)))
+        ));
+
+        grid(0).createCache(ccfg);
+
+        try (IgniteDataStreamer<Object, Object> ds = grid(0).dataStreamer(cacheName)) {
+
+            for (int recordId = 0; recordId < MAX_ROWS; recordId++) {
+                // If two caches has the same PK, FK fields ("id") will be different.
+                int intTabIdFK = (recordId + shift) % MAX_ROWS;
+
+                ds.addData(recordId,
+                    new Person(intTabIdFK,
+                        "Name_" + recordId,
+                        "LastName_" + recordId,
+                        42));
+            }
+        }
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
         cntr = 0;
 
+        GridQueryProcessor.idxCls = MockedIndexing.class;
+
         startGrids(NODES_COUNT);
 
-        for (int i = 0; i < MAX_ROWS; ++i) {
-            grid(0).cache(GridAbstractTest.DEFAULT_CACHE_NAME).put(i, i);
+        // Let's set baseline topology manually. Doing so we are sure that partitions are distributed beetween our 2 srv
+        // nodes, not belong only one node.
+        awaitPartitionMapExchange(true, true, null);
+
+        long curTop = grid(0).cluster().topologyVersion();
+
+        grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+        grid(0).cluster().setBaselineTopology(curTop);
 
-            grid(0).cache(GridAbstractTest.DEFAULT_CACHE_NAME).put((long)i, (long)i);
+        awaitPartitionMapExchange(true, true, null);
+
+        // Populate data.
+        try (IgniteDataStreamer<Object, Object> ds = grid(0).dataStreamer(GridAbstractTest.DEFAULT_CACHE_NAME)) {
+            for (int i = 0; i < MAX_ROWS; ++i) {
+                ds.addData(i, i);
+
+                ds.addData((long)i, (long)i);
+            }
         }
+
+        createJoinCache("PERS1", 1);
+        createJoinCache("PERS2", 2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        GridQueryProcessor.idxCls = null;
+
+        super.afterTestsStopped();
     }
 
     /**
@@ -197,7 +327,7 @@ public class KillQueryTest extends GridCommonAbstractTest {
      */
     @Before
     public void before() throws Exception {
-        TestSQLFunctions.init();
+        TestSQLFunctions.reset();
 
         newBarrier(1);
 
@@ -212,6 +342,8 @@ public class KillQueryTest extends GridCommonAbstractTest {
         ignite = grid(0);
 
         igniteForKillRequest = getKillRequestNode();
+
+        MockedIndexing.resetToDefault();
     }
 
     /**
@@ -221,6 +353,10 @@ public class KillQueryTest extends GridCommonAbstractTest {
      */
     @After
     public void after() throws Exception {
+        MockedIndexing.resetToDefault();
+
+        clientBlocker.stopBlock(false);
+
         if (stmt != null && !stmt.isClosed()) {
             stmt.close();
 
@@ -233,176 +369,130 @@ public class KillQueryTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Trying to cancel COPY FROM command.
+     * Tries to cancel COPY FROM command, then checks such cancellation is unsupported.
      *
-     * @throws Exception In case of failure.
+     * 1) Run COPY query, got it suspended in the middle. 2) Try to cancel it, get expected exception. 3) Wake up the
+     * COPY. 4) Check COPY is done.
      */
     @Test
     public void testBulkLoadCancellationUnsupported() throws Exception {
-        checkCreateTableCancellationUnsupported(false);
-    }
-
-    /**
-     * Trying to async cancel COPY FROM command.
-     *
-     * @throws Exception In case of failure.
-     */
-    @Test
-    public void testAsyncBulkLoadCancellationUnsupported() throws Exception {
-        checkCreateTableCancellationUnsupported(true);
-    }
-
-    /**
-     * Trying to cancel COPY FROM command.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     * @throws Exception In case of failure.
-     */
-    public void checkBulkLoadCancellationUnsupported(boolean async) throws Exception {
         String path = Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload1.csv"))
             .getAbsolutePath();
 
-        String sqlPrepare = "CREATE TABLE " + currentTestTableName() +
+        String createTab = "CREATE TABLE " + currentTestTableName() +
             "(id integer primary key, age integer, firstName varchar, lastname varchar)";
-        String sqlCmd = "COPY FROM '" + path + "'" +
+
+        String copy = "COPY FROM '" + path + "'" +
             " INTO " + currentTestTableName() +
-            " (_key, age, firstName, lastName)" +
+            " (id, age, firstName, lastName)" +
             " format csv charset 'ascii'";
 
-        checkCancellationUnsupported(
-            Arrays.asList(sqlPrepare),
-            sqlCmd,
-            async);
+        // It's importaint to COPY from the client node: in this case datastreamer doesn't perform local updates so
+        // it sends communication messages which we can hold.
+        IgniteEx clientNode = grid(NODES_COUNT - 1);
+
+        try (Connection clConn = GridTestUtils.connect(clientNode, null);
+             final Statement client = clConn.createStatement()) {
+            client.execute(createTab);
+
+            // Suspend further copy query by holding data streamer messages.
+            clientBlocker.blockMessages((dstNode, msg) -> msg instanceof DataStreamerRequest);
+
+            IgniteInternalFuture<Boolean> copyIsDone = GridTestUtils.runAsync(() -> client.execute(copy));
+
+            // Wait at least one streamer message, that means copy started.
+            clientBlocker.waitForBlocked(1, TIMEOUT);
+
+            // Query can be found only on the connected node.
+            String globQryId = findOneRunningQuery(copy, clientNode);
+
+            GridTestUtils.assertThrowsAnyCause(log,
+                () -> igniteForKillRequest.cache(DEFAULT_CACHE_NAME).query(createKillQuery(globQryId, asyncCancel)),
+                CacheException.class,
+                "Query doesn't support cancellation");
+
+            // Releases copy.
+            clientBlocker.stopBlock(true);
+
+            copyIsDone.get(TIMEOUT);
+
+            int tabSize = clientNode.cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("SELECT * FROM " + currentTestTableName() + " ").setSchema("PUBLIC"))
+                .getAll()
+                .size();
+
+            assertEquals("COPY command inserted incorrect number of rows.", 1, tabSize);
+        }
     }
 
     /**
-     * Trying to cancel CREATE TABLE command.
+     * Finds global id of the specified query on the specified node. Expecting exactly one result.
      *
-     * @throws Exception In case of failure.
+     * @param query Query text to find id for.
+     * @param node Node handle to the node, which initiated the query.
      */
-    @Test
-    public void testCreateTableCancellationUnsupported() throws Exception {
-        checkCreateTableCancellationUnsupported(false);
+    private String findOneRunningQuery(String query, IgniteEx node) {
+        List<GridRunningQueryInfo> qryList = findQueriesOnNode(query, node);
+
+        assertEquals("Expected only one running query: " + query + "\nBut found: " + qryList, 1, qryList.size());
+
+        return qryList.get(0).globalQueryId();
     }
 
     /**
-     * Trying to async cancel CREATE TABLE command.
+     * Finds queries that has specified text and are initially runned on the specified server node.
      *
-     * @throws Exception In case of failure.
+     * @param query text of the query to find.
+     * @param node server node that runs the reduce query.
      */
-    @Test
-    public void testAsyncCreateTableCancellationUnsupported() throws Exception {
-        checkCreateTableCancellationUnsupported(true);
+    private List<GridRunningQueryInfo> findQueriesOnNode(String query, IgniteEx node) {
+        List<GridRunningQueryInfo> allQrs = (List<GridRunningQueryInfo>)node.context().query().runningQueries(-1);
+
+        return allQrs.stream()
+            .filter(q -> q.query().equals(query))
+            .collect(Collectors.toList());
     }
 
     /**
      * Trying to cancel CREATE TABLE command.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     * @throws Exception In case of failure.
      */
-    public void checkCreateTableCancellationUnsupported(boolean async) throws Exception {
+    @Test
+    public void testCreateTableCancellationUnsupported() throws Exception {
         checkCancellationUnsupported(Collections.<String>emptyList(),
             "CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)",
-            async);
+            asyncCancel);
     }
 
     /**
      * Trying to cancel ALTER TABLE command.
-     *
-     * @throws Exception In case of failure.
      */
     @Test
     public void testAlterTableCancellationUnsupported() throws Exception {
-        checkAlterTableCancellationUnsupported(false);
-    }
-
-    /**
-     * Trying to async cancel ALTER TABLE command.
-     *
-     * @throws Exception In case of failure.
-     */
-    @Test
-    public void testAsyncAlterTableCancellationUnsupported() throws Exception {
-        checkAlterTableCancellationUnsupported(true);
-    }
-
-    /**
-     * Trying to cancel ALTER TABLE command.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     * @throws Exception In case of failure.
-     */
-    private void checkAlterTableCancellationUnsupported(boolean async) throws Exception {
         checkCancellationUnsupported(Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"),
             "ALTER TABLE " + currentTestTableName() + " ADD COLUMN COL VARCHAR",
-            async);
+            asyncCancel);
     }
 
     /**
      * Trying to cancel CREATE INDEX command.
-     *
-     * @throws Exception In case of failure.
      */
     @Test
     public void testCreateIndexCancellationUnsupported() throws Exception {
-        checkCreateIndexCancellationUnsupported(false);
-    }
-
-    /**
-     * Trying to async cancel CREATE INDEX command.
-     *
-     * @throws Exception In case of failure.
-     */
-    @Test
-    public void testAsyncCreateIndexCancellationUnsupported() throws Exception {
-        checkCreateIndexCancellationUnsupported(true);
-    }
-
-    /**
-     * Trying to cancel CREATE INDEX command.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     * @throws Exception In case of failure.
-     */
-    private void checkCreateIndexCancellationUnsupported(boolean async) throws Exception {
         checkCancellationUnsupported(Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"),
             "CREATE INDEX " + currentTestTableName() + "_IDX ON " + currentTestTableName() + "(name, id)",
-            async);
+            asyncCancel);
     }
 
     /**
      * Trying to cancel DROP INDEX command.
-     *
-     * @throws Exception In case of failure.
      */
     @Test
     public void testDropIndexCancellationUnsupported() throws Exception {
-        checkDropIndexCancellationUnsupported(false);
-    }
-
-    /**
-     * Trying to async cancel DROP INDEX command.
-     *
-     * @throws Exception In case of failure.
-     */
-    @Test
-    public void testAsyncDropIndexCancellationUnsupported() throws Exception {
-        checkDropIndexCancellationUnsupported(true);
-    }
-
-    /**
-     * Trying to cancel DROP INDEX command.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     * @throws Exception In case of failure.
-     */
-    private void checkDropIndexCancellationUnsupported(boolean async) throws Exception {
         checkCancellationUnsupported(
             Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)",
                 "CREATE INDEX " + currentTestTableName() + "_IDX ON " + currentTestTableName() + "(name, id)"),
             "DROP INDEX " + currentTestTableName() + "_IDX",
-            async);
+            asyncCancel);
     }
 
     /**
@@ -422,7 +512,8 @@ public class KillQueryTest extends GridCommonAbstractTest {
      * @param async Execute cancellation in ASYNC mode.
      * @throws Exception In case of failure.
      */
-    private void checkCancellationUnsupported(List<String> prepareSteps, String sqlCmd, boolean async) throws Exception {
+    private void checkCancellationUnsupported(List<String> prepareSteps, String sqlCmd,
+        boolean async) throws Exception {
         for (String sql : prepareSteps) {
             try {
                 stmt.execute(sql);
@@ -444,32 +535,15 @@ public class KillQueryTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Trying to cancel unexist query.
+     * Trying to cancel non-existing query.
      */
     @Test
     public void testKillUnknownQry() {
-        checkKillUnknownQry(false);
-    }
-
-    /**
-     * Trying to async cancel unexist query.
-     */
-    @Test
-    public void testAsyncKillUnknownQry() {
-        checkKillUnknownQry(true);
-    }
-
-    /**
-     * Trying to cancel unexist query.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     */
-    private void checkKillUnknownQry(boolean async) {
         UUID nodeId = ignite.localNode().id();
 
         GridTestUtils.assertThrows(log, () -> {
             igniteForKillRequest.cache(DEFAULT_CACHE_NAME)
-                .query(createKillQuery(nodeId, Long.MAX_VALUE, async));
+                .query(createKillQuery(nodeId, Long.MAX_VALUE, asyncCancel));
 
             return null;
         }, CacheException.class, "Query with provided ID doesn't exist [nodeId=" + nodeId);
@@ -480,26 +554,9 @@ public class KillQueryTest extends GridCommonAbstractTest {
      */
     @Test
     public void testKillQryUnknownNode() {
-        checkKillQryUnknownNode(false);
-    }
-
-    /**
-     * Trying to async cancel query on unknown node.
-     */
-    @Test
-    public void testAsyncKillQryUnknownNode() {
-        checkKillQryUnknownNode(true);
-    }
-
-    /**
-     * Trying to cancel query on unexist node.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     */
-    private void checkKillQryUnknownNode(boolean async) {
         GridTestUtils.assertThrows(log, () -> {
             igniteForKillRequest.cache(DEFAULT_CACHE_NAME)
-                .query(createKillQuery(UUID.randomUUID(), Long.MAX_VALUE, async));
+                .query(createKillQuery(UUID.randomUUID(), Long.MAX_VALUE, asyncCancel));
 
             return null;
         }, CacheException.class, "Failed to cancel query, node is not alive");
@@ -507,44 +564,22 @@ public class KillQueryTest extends GridCommonAbstractTest {
 
     /**
      * Trying to kill already killed query. No exceptions expected.
-     *
-     * @throws Exception If failed.
      */
     @Test
     public void testKillAlreadyKilledQuery() throws Exception {
-        checkKillAlreadyKilledQuery(false);
-    }
-
-    /**
-     * Trying to kill already killed query. No exceptions expected.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testAsyncKillAlreadyKilledQuery() throws Exception {
-        checkKillAlreadyKilledQuery(true);
-    }
-
-    /**
-     * Trying to kill already killed query. No exceptions expected.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     * @throws Exception If failed.
-     */
-    private void checkKillAlreadyKilledQuery(boolean async) throws Exception {
         IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-        FieldsQueryCursor<List<?>> cur = cache.query(new SqlFieldsQuery("select * from Integer"));
+        FieldsQueryCursor<List<?>> cur = cache.query(new SqlFieldsQuery("select * from Integer where awaitLatchCancelled() = 0"));
 
         List<GridRunningQueryInfo> runningQueries = (List<GridRunningQueryInfo>)ignite.context().query().runningQueries(-1);
 
         GridRunningQueryInfo runQryInfo = runningQueries.get(0);
 
-        SqlFieldsQuery killQry = createKillQuery(runQryInfo.globalQueryId(), async);
+        SqlFieldsQuery killQry = createKillQuery(runQryInfo.globalQueryId(), asyncCancel);
 
         IgniteCache<Object, Object> reqCache = igniteForKillRequest.cache(DEFAULT_CACHE_NAME);
 
-        IgniteInternalFuture killFut = GridTestUtils.runAsync(() -> reqCache.query(killQry));
+        IgniteInternalFuture killFut = cancel(1, asyncCancel);
 
         GridTestUtils.assertThrows(log,
             () -> cur.iterator().next(),
@@ -579,36 +614,37 @@ public class KillQueryTest extends GridCommonAbstractTest {
 
     /**
      * Trying to cancel long running query. No exceptions expected.
-     *
-     * @throws Exception If failed.
      */
     @Test
     public void testCancelQuery() throws Exception {
-        checkCancelQuery(false);
+        IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
+
+        GridTestUtils.assertThrows(log, () -> {
+            stmt.executeQuery("select * from Integer where _key in " +
+                "(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()");
+
+            return null;
+        }, SQLException.class, "The query was cancelled while executing.");
+
+        // Ensures that there were no exceptions within async cancellation process.
+        cancelRes.get(CHECK_RESULT_TIMEOUT);
     }
 
     /**
-     * Trying to async cancel long running query. No exceptions expected.
-     *
-     * @throws Exception If failed.
+     * Trying to cancel long running query if partition pruning does it job. It's important to set {@link
+     * IgniteSystemProperties#IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN} bigger than partitions count {@link
+     * #PARTS_CNT}
      */
     @Test
-    public void testAsyncCancelQuery() throws Exception {
-        checkCancelQuery(true);
-    }
+    public void testCancelQueryPartitionPruning() throws Exception {
+        IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
 
-    /**
-     * Trying to cancel long running query. No exceptions expected.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     * @throws Exception If failed.
-     */
-    private void checkCancelQuery(boolean async) throws Exception {
-        IgniteInternalFuture cancelRes = cancel(1, async);
+        final int ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL = 400;
 
         GridTestUtils.assertThrows(log, () -> {
-            stmt.executeQuery("select * from Integer where _key in " +
-                "(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()");
+            stmt.executeQuery("select * from Integer where _key between 1000 and 2000 " +
+                "and awaitLatchCancelled() = 0 " +
+                "and shouldNotBeCalledMoreThan(" + ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL + ")");
 
             return null;
         }, SQLException.class, "The query was cancelled while executing.");
@@ -618,32 +654,58 @@ public class KillQueryTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Trying to cancel long running multiple statements query. No exceptions expected.
-     *
-     * @throws Exception If failed.
+     * Check that local query can be canceled either using async or non-async method. Local query is performed using
+     * cache.query() API with "local" property "true".
      */
     @Test
-    public void testKillMultipleStatementsQuery() throws Exception {
-        checkKillMultipleStatementsQuery(false);
+    public void testCancelLocalQueryNative() throws Exception {
+        IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
+
+        GridTestUtils.assertThrows(log, () -> {
+            ignite.cache(DEFAULT_CACHE_NAME).query(
+                new SqlFieldsQuery("select * from Integer where _key in " +
+                    "(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()")
+                    .setLocal(true)
+            ).getAll();
+
+            return null;
+        }, IgniteException.class, "The query was cancelled while executing.");
+
+        // Ensures that there were no exceptions within async cancellation process.
+        cancelRes.get(CHECK_RESULT_TIMEOUT);
     }
 
     /**
-     * Trying to async cancel long running multiple statements query. No exceptions expected.
-     *
-     * @throws Exception If failed.
+     * Check distributed query can be canceled.
      */
     @Test
-    public void testAsyncKillMultipleStatementsQuery() throws Exception {
-        checkKillMultipleStatementsQuery(true);
+    public void testCancelDistributeJoin() throws Exception {
+        IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
+
+        final int ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL = MAX_ROWS - 1;
+
+        GridTestUtils.assertThrows(log, () -> {
+            ignite.cache(DEFAULT_CACHE_NAME).query(
+                new SqlFieldsQuery("SELECT p1.rec_id, p1.id, p2.rec_id " +
+                    "FROM PERS1.Person p1 JOIN PERS2.Person p2 " +
+                    "ON p1.id = p2.id " +
+                    "AND shouldNotBeCalledMoreThan(" + ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL + ")" +
+                    "AND awaitLatchCancelled() = 0")
+                    .setDistributedJoins(true)
+            ).getAll();
+
+            return null;
+        }, CacheException.class, "The query was cancelled while executing.");
+
+        // Ensures that there were no exceptions within async cancellation process.
+        cancelRes.get(CHECK_RESULT_TIMEOUT);
     }
 
     /**
      * Trying to async cancel long running multiple statements query. No exceptions expected.
-     *
-     * @param async execute cancellation in ASYNC mode.
-     * @throws Exception If failed.
      */
-    private void checkKillMultipleStatementsQuery(boolean async) throws Exception {
+    @Test
+    public void testKillMultipleStatementsQuery() throws Exception {
         try (Statement anotherStatement = conn.createStatement()) {
             anotherStatement.setFetchSize(1);
 
@@ -653,7 +715,7 @@ public class KillQueryTest extends GridCommonAbstractTest {
 
             assert rs.next();
 
-            IgniteInternalFuture cancelRes = cancel(3, async, sql);
+            IgniteInternalFuture cancelRes = cancel(3, asyncCancel, sql);
 
             GridTestUtils.assertThrows(log, () -> {
                 // Executes multiple long running query
@@ -677,26 +739,6 @@ public class KillQueryTest extends GridCommonAbstractTest {
      */
     @Test
     public void testCancelBatchQuery() throws Exception {
-        checkCancelBatchQuery(false);
-    }
-
-    /**
-     * Trying to async cancel long running batch query. No exceptions expected.     *
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testAsyncCancelBatchQuery() throws Exception {
-        checkCancelBatchQuery(true);
-    }
-
-    /**
-     * Trying to cancel long running batch query. No exceptions expected.     *
-     *
-     * @param async execute cancellation in ASYNC mode.
-     * @throws Exception If failed.
-     */
-    private void checkCancelBatchQuery(boolean async) throws Exception {
         try (Statement stmt2 = conn.createStatement()) {
             stmt2.setFetchSize(1);
 
@@ -706,7 +748,7 @@ public class KillQueryTest extends GridCommonAbstractTest {
 
             Assert.assertTrue(rs.next());
 
-            IgniteInternalFuture cancelRes = cancel(2, async, sql);
+            IgniteInternalFuture cancelRes = cancel(2, asyncCancel, sql);
 
             GridTestUtils.assertThrows(log, () -> {
                 stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
@@ -726,6 +768,169 @@ public class KillQueryTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Check if query hangs (due to reducer spins retrying to reserve partitions but they can't be reserved), we still
+     * able to cancel it. Used mocked indexing simulates 100% unability.
+     */
+    @Test
+    public void testCancelQueryIfPartitionsCantBeReservedOnMapNodes() throws Exception {
+        // Releases thread that kills query when map nodes receive query request. At least one map node received is ok.
+        GridMessageListener qryStarted = (node, msg, plc) -> {
+            if (msg instanceof GridH2QueryRequest)
+                TestSQLFunctions.cancelLatch.countDown();
+        };
+
+        for (int i = 0; i < NODES_COUNT; i++)
+            grid(i).context().io().addMessageListener(GridTopic.TOPIC_QUERY, qryStarted);
+
+        // Suspends distributed queries on the map nodes.
+        MockedIndexing.failReservations = true;
+
+        try {
+            IgniteInternalFuture cancelFut = cancel(1, asyncCancel);
+
+            GridTestUtils.assertThrows(log, () -> {
+                ignite.cache(DEFAULT_CACHE_NAME).query(
+                    new SqlFieldsQuery("select * from Integer where _val <> 42")
+                ).getAll();
+
+                return null;
+            }, CacheException.class, "The query was cancelled while executing.");
+
+            cancelFut.get(CHECK_RESULT_TIMEOUT);
+        }
+        finally {
+            for (int i = 0; i < NODES_COUNT; i++)
+                grid(i).context().io().removeMessageListener(GridTopic.TOPIC_QUERY, qryStarted);
+        }
+    }
+
+    /**
+     * Check if query hangs due to reducer cannot get nodes for partitions, it still can be killed.
+     */
+    @Test
+    public void testCancelQueryIfUnableToGetNodesForPartitions() throws Exception {
+        // Force query to spin retrying to get nodes for partitions.
+        MockedIndexing.retryNodePartMapping = true;
+
+        String select = "select * from Integer where _val <> 42";
+
+        IgniteInternalFuture runQueryFut = GridTestUtils.runAsync(() ->
+            ignite.cache(DEFAULT_CACHE_NAME).query(
+                new SqlFieldsQuery(select)
+            ).getAll());
+
+        boolean gotOneFreezedSelect = GridTestUtils.waitForCondition(
+            () -> findQueriesOnNode(select, ignite).size() == 1, TIMEOUT);
+
+        if (!gotOneFreezedSelect) {
+            if (runQueryFut.isDone())
+                printFuturesException("Got exception getting running the query.", runQueryFut);
+
+            Assert.fail("Failed to wait for query to be in running queries list exactly one time " +
+                "[select=" + select + ", node=" + ignite.localNode().id() + ", timeout=" + TIMEOUT + "ms].");
+
+        }
+
+        SqlFieldsQuery killQry = createKillQuery(findOneRunningQuery(select, ignite), asyncCancel);
+
+        ignite.cache(DEFAULT_CACHE_NAME).query(killQry);
+
+        GridTestUtils.assertThrowsAnyCause(
+            log,
+            () -> runQueryFut.get(CHECK_RESULT_TIMEOUT),
+            CacheException.class,
+            "The query was cancelled while executing.");
+
+    }
+
+    /**
+     * Print to log exception that have been catched on other thread and have been put to specified future.
+     *
+     * @param msg message to add.
+     * @param fut future containing the exception.
+     */
+    private void printFuturesException(String msg, IgniteInternalFuture fut) {
+        try {
+            fut.get(TIMEOUT);
+        }
+        catch (Exception e) {
+            log.error(msg, e);
+        }
+    }
+
+    /**
+     * Test if user specified partitions for query explicitly, such query is cancealble.
+     *
+     * We check 3 scenarious in which partitions are belong to: 1) only first node <br/> 2) only second node <br/> 3)
+     * some to first, the others to second <br/>
+     */
+    @Test
+    public void testCancelQueryWithPartitions() throws Exception {
+        Affinity<Object> aff = ignite.affinity(DEFAULT_CACHE_NAME);
+
+        int halfOfNodeParts = PARTS_CNT / 4;
+
+        int[] firstParts = stream(aff.primaryPartitions(grid(0).localNode())).limit(halfOfNodeParts).toArray();
+        int[] secondParts = stream(aff.primaryPartitions(grid(1).localNode())).limit(halfOfNodeParts).toArray();
+
+        int[] mixedParts = IntStream.concat(
+            stream(firstParts).limit(halfOfNodeParts),
+            stream(secondParts).limit(halfOfNodeParts)
+        ).toArray();
+
+        checkPartitions(firstParts);
+        checkPartitions(secondParts);
+        checkPartitions(mixedParts);
+    }
+
+    /**
+     * Test if user specified partitions for query explicitly, such query is cancealble.
+     *
+     * @param partitions user specified partitions, could contain partitions that are mapped on one or both nodes.
+     */
+    public void checkPartitions(int[] partitions) throws Exception {
+        TestSQLFunctions.reset();
+
+        IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
+
+        GridTestUtils.assertThrows(log, () -> {
+            ignite.cache(DEFAULT_CACHE_NAME).query(
+                new SqlFieldsQuery("select * from Integer where _key in " +
+                    "(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()")
+                    .setPartitions(partitions)
+            ).getAll();
+
+            return null;
+        }, CacheException.class, "The query was cancelled while executing.");
+
+        // Ensures that there were no exceptions within async cancellation process.
+        cancelRes.get(CHECK_RESULT_TIMEOUT);
+    }
+
+    /**
+     * Wait until all map parts are finished on the specified node. Not needed when IGN-13862 is done.
+     *
+     * @param node node for which map request completion to wait.
+     */
+    private void ensureMapQueriesHasFinished(IgniteEx node) throws Exception {
+        boolean noTasksInQryPool = GridTestUtils.waitForCondition(() -> queryPoolIsEmpty(node), TIMEOUT);
+
+        Assert.assertTrue("Node " + node.localNode().id() + " has not finished its tasks in the query pool",
+            noTasksInQryPool);
+    }
+
+    /**
+     * @param node node which query pool to check.
+     * @return {@code True} if {@link GridIoPolicy#QUERY_POOL} is empty. This means no queries are currntly executed and
+     * no queries are executed at the moment; {@code false} otherwise.
+     */
+    private boolean queryPoolIsEmpty(IgniteEx node) {
+        ThreadPoolExecutor qryPool = (ThreadPoolExecutor)node.context().getQueryExecutorService();
+
+        return qryPool.getQueue().isEmpty() && qryPool.getActiveCount() == 0;
+    }
+
+    /**
      * Cancels current query which wait on barrier.
      *
      * @param qry Query which need to try cancel.
@@ -795,15 +1000,25 @@ public class KillQueryTest extends GridCommonAbstractTest {
                         ));
                 }
 
+                // This sleep is required to wait for kill queries get started.
                 doSleep(500);
 
+                if (expQryNum != runningQueries.size())
+                    log.error("Found running queries are incorrect, " +
+                        "expected only " + expQryNum + " queries. Found : " + runningQueries);
+
                 assertEquals(expQryNum, runningQueries.size());
 
                 TestSQLFunctions.reqLatch.countDown();
 
-                for (IgniteInternalFuture fut : res) {
+                for (IgniteInternalFuture fut : res)
                     fut.get(TIMEOUT);
-                }
+
+                // Currently canceled query returns (unblocks the caller code) without waiting for map parts of the
+                // query to be finished. We need to wait for them. This is a workaround for IGN-13862 because we
+                // observe side effects if map parts of canceled query are still running.
+                ensureMapQueriesHasFinished(grid(0));
+                ensureMapQueriesHasFinished(grid(1));
             }
             catch (Exception e) {
                 log.error("Unexpected exception.", e);
@@ -843,6 +1058,9 @@ public class KillQueryTest extends GridCommonAbstractTest {
      * @param parties the number of threads that must invoke await method before the barrier is tripped
      */
     private static void newBarrier(int parties) {
+        if (barrier != null)
+            barrier.reset();
+
         barrier = new CyclicBarrier(parties);
     }
 
@@ -860,23 +1078,40 @@ public class KillQueryTest extends GridCommonAbstractTest {
      */
     public static class TestSQLFunctions {
         /** Request latch. */
-        static CountDownLatch reqLatch;
+        static volatile CountDownLatch reqLatch;
 
         /** Cancel latch. */
-        static CountDownLatch cancelLatch;
+        static volatile CountDownLatch cancelLatch;
 
         /** Suspend query latch. */
-        static CountDownLatch suspendQryLatch;
+        static volatile CountDownLatch suspendQryLatch;
+
+        /** How many times function {@link #shouldNotBeCalledMoreThan} have been called so far. */
+        static volatile AtomicInteger funCallCnt;
 
         /**
-         * Recreate latches.
+         * Recreate latches. Old latches are released.
          */
-        static void init() {
+        static void reset() {
+            releaseLatches(reqLatch, cancelLatch, suspendQryLatch);
+
             reqLatch = new CountDownLatch(1);
 
             cancelLatch = new CountDownLatch(1);
 
             suspendQryLatch = new CountDownLatch(1);
+
+            funCallCnt = new AtomicInteger(0);
+        }
+
+        /**
+         * @param latches latches to release. Our latches have initial count = 1.
+         */
+        private static void releaseLatches(CountDownLatch... latches) {
+            for (CountDownLatch l : latches) {
+                if (l != null)
+                    l.countDown();
+            }
         }
 
         /**
@@ -915,6 +1150,22 @@ public class KillQueryTest extends GridCommonAbstractTest {
         }
 
         /**
+         * Asserts that this function have not been called more than specified number times. Otherwise we're failing.
+         * Intended to check that query is canceled but since cancel is not instant (query continues to process some
+         * number of rows), it don't process all the rows in the table (in case of full scan, of course).
+         *
+         * @param times deadline times.
+         * @return always {@link true}.
+         */
+        @QuerySqlFunction
+        public static boolean shouldNotBeCalledMoreThan(int times) {
+            if (funCallCnt.incrementAndGet() >= times)
+                fail("Query is running too long since it was canceled.");
+
+            return true;
+        }
+
+        /**
          * If called fails with corresponding message.
          *
          * @return 0;
@@ -1007,4 +1258,99 @@ public class KillQueryTest extends GridCommonAbstractTest {
             return result;
         }
     }
+
+    /**
+     * Mocked indexing to eventually suspend mapper or reducer code. It simulates never ending PMEs, unstable topologies
+     * and etc.
+     */
+    static class MockedIndexing extends IgniteH2Indexing {
+        /**
+         * All the time this flag is set to {@code true}, no partitions can be reserved. Acts like normal indexing by
+         * default.
+         */
+        static volatile boolean failReservations;
+
+        /**
+         * All the time this flag is set to {@code true}, reducer is not able to get nodes for given partitions because
+         * mapper says "retry later".
+         */
+        static volatile boolean retryNodePartMapping;
+
+        /**
+         * Result of the mapping partitions to nodes, that indicates that caller should retry request later.
+         */
+        private static final ReducePartitionMapResult RETRY_RESULT = new ReducePartitionMapResult(null, null, null);
+
+        /**
+         * Resets to default behaviour: disable all mock added logic.
+         */
+        static void resetToDefault() {
+            failReservations = false;
+
+            retryNodePartMapping = false;
+        }
+
+        /**
+         * Setups mock objects into this indexing, just after super initialization is done.
+         */
+        @Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException {
+            super.start(ctx, busyLock);
+
+            setReservationManager(new PartitionReservationManager(super.ctx) {
+                /** {@inheritDoc} */
+                @Override public PartitionReservation reservePartitions(@Nullable List<Integer> cacheIds,
+                    AffinityTopologyVersion reqTopVer, int[] explicitParts, UUID nodeId, long reqId)
+                    throws IgniteCheckedException {
+                    return failReservations ? new PartitionReservation(null,
+                        "[TESTS]: Failed to reserve partitions for the testing purpose!") :
+                        super.reservePartitions(cacheIds, reqTopVer, explicitParts, nodeId, reqId);
+                }
+            });
+
+            setMapper(new ReducePartitionMapper(ctx, ctx.log(GridReduceQueryExecutor.class)) {
+                /** {@inheritDoc} */
+                @Override public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds,
+                    AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly, long qryId) {
+
+                    return retryNodePartMapping ? RETRY_RESULT :
+                        super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryId);
+                }
+            });
+
+        }
+
+        /**
+         * Injects custom {@link PartitionReservationManager} into indexing instance.
+         */
+        protected void setReservationManager(PartitionReservationManager mockMgr) {
+            try {
+                Field partReservationMgrFld = IgniteH2Indexing.class.getDeclaredField("partReservationMgr");
+
+                partReservationMgrFld.setAccessible(true);
+
+                partReservationMgrFld.set(this, mockMgr);
+            }
+            catch (Exception rethrown) {
+                throw new RuntimeException(rethrown);
+            }
+        }
+
+        /**
+         * Injects custom {@link ReducePartitionMapper} into reducer of this indexing instance.
+         */
+        private void setMapper(ReducePartitionMapper mock) {
+            try {
+                GridReduceQueryExecutor rdcExec = this.reduceQueryExecutor();
+
+                Field mapperFld = GridReduceQueryExecutor.class.getDeclaredField("mapper");
+
+                mapperFld.setAccessible(true);
+
+                mapperFld.set(rdcExec, mock);
+            }
+            catch (Exception rethrown) {
+                throw new RuntimeException(rethrown);
+            }
+        }
+    }
 }
diff --git a/modules/yarn/src/main/resources/ignite.properties b/modules/yarn/src/main/resources/ignite.properties
index 57dcd33..90d5787 100644
--- a/modules/yarn/src/main/resources/ignite.properties
+++ b/modules/yarn/src/main/resources/ignite.properties
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-ignite.version=2.7.0-SNAPSHOT
+ignite.version=2.8.0-SNAPSHOT
 ignite.build=0
 ignite.revision=DEV
 ignite.rel.date=01011970