You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2022/05/12 11:34:24 UTC

[ignite] branch master updated: IGNITE-16916 Graceful cancel of job workers instead of brutal interrupt - Fixes #10015.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cbaf6c371d IGNITE-16916 Graceful cancel of job workers instead of brutal interrupt - Fixes #10015.
9cbaf6c371d is described below

commit 9cbaf6c371dd489b2243d1737b0bd325333af79d
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu May 12 14:26:22 2022 +0300

    IGNITE-16916 Graceful cancel of job workers instead of brutal interrupt - Fixes #10015.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 modules/control-utility/pom.xml                    |  14 +
 .../ignite/util/KillCommandsCommandShTest.java     |   3 +
 .../managers/discovery/GridDiscoveryManager.java   |   4 +-
 .../cache/GridCacheSharedTtlCleanupManager.java    |  12 +-
 .../cache/binary/BinaryMetadataFileStore.java      |   2 +-
 .../cache/persistence/checkpoint/Checkpointer.java |   8 +-
 .../persistence/wal/FileWriteAheadLogManager.java  |  22 +-
 .../wal/filehandle/FileHandleManagerImpl.java      |   6 +-
 .../DistributedConfigurationProcessor.java         |   5 +-
 .../internal/processors/job/GridJobProcessor.java  |  36 +-
 .../internal/processors/job/GridJobWorker.java     |  80 +++-
 .../job/JobWorkerInterruptionTimeoutObject.java    |  87 +++++
 .../persistence/DmsDataWriterWorker.java           |   4 +-
 .../processors/timeout/GridTimeoutObject.java      |   6 +-
 .../processors/timeout/GridTimeoutProcessor.java   |   2 +-
 .../ignite/internal/util/StripedExecutor.java      |   2 +-
 .../ignite/internal/util/nio/GridNioServer.java    |   2 +-
 .../ignite/internal/util/worker/GridWorker.java    |  54 ++-
 .../internal/GridCancelOnGridStopSelfTest.java     |   5 +-
 .../internal/GridCancelUnusedJobSelfTest.java      |  30 +-
 .../internal/GridCancelledJobsMetricsSelfTest.java |  32 +-
 .../GridFailoverCustomTopologySelfTest.java        |  27 +-
 .../GridMultithreadedJobStealingSelfTest.java      |   3 +
 .../internal/GridStopWithCancelSelfTest.java       |   3 +
 .../GridTaskFutureImplStopGridSelfTest.java        |  19 +-
 .../internal/metric/SystemViewComputeJobTest.java  |   3 +
 .../processors/compute/ComputeJobStatusTest.java   |   6 +
 .../compute/InterruptComputeJobTest.java           | 416 +++++++++++++++++++++
 ...ridSessionCancelSiblingsFromFutureSelfTest.java |  60 +--
 .../GridSessionCancelSiblingsFromJobSelfTest.java  |  62 +--
 .../GridSessionCancelSiblingsFromTaskSelfTest.java |  64 ++--
 .../apache/ignite/testframework/GridTestUtils.java |  45 ++-
 .../testframework/junits/GridAbstractTest.java     |   2 +-
 .../junits/common/GridCommonAbstractTest.java      |  18 +
 .../junits/spi/GridSpiAbstractTest.java            |   4 +-
 .../testsuites/IgniteComputeGridTestSuite.java     |   4 +-
 .../apache/ignite/util/KillCommandsMXBeanTest.java |   3 +
 .../apache/ignite/util/KillCommandsSQLTest.java    |   3 +
 38 files changed, 949 insertions(+), 209 deletions(-)

diff --git a/modules/control-utility/pom.xml b/modules/control-utility/pom.xml
index 06163ce532a..033e2e7b178 100644
--- a/modules/control-utility/pom.xml
+++ b/modules/control-utility/pom.xml
@@ -130,6 +130,20 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
index 7c844ad9e49..017c2d2cf13 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
@@ -81,6 +81,9 @@ public class KillCommandsCommandShTest extends GridCommandHandlerClusterByClassA
             cache.put(i, i);
 
         awaitPartitionMapExchange();
+
+        // We change to reduce the waiting time for interrupting compute job.
+        computeJobWorkerInterruptTimeout(srvs.get(0)).propagate(100L);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 72f57af2628..f87beacf739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2898,7 +2898,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     if (!isInterruptedException)
                         U.error(log, "Exception in discovery notifier worker thread.", t);
 
-                    if (!isInterruptedException || !isCancelled) {
+                    if (!isInterruptedException || !isCancelled.get()) {
                         FailureType type = t instanceof OutOfMemoryError ? CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION;
 
                         ctx.failure().process(new FailureContext(type, t));
@@ -3078,7 +3078,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     onIdle();
                 }
                 catch (InterruptedException e) {
-                    if (!isCancelled)
+                    if (!isCancelled.get())
                         ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, e));
 
                     throw e;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
index da2b793e242..dc3aa396e8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
@@ -164,6 +164,12 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
                         cctx.exchange().affinityReadyFuture(AffinityTopologyVersion.ZERO).get();
                     }
                     catch (IgniteCheckedException ex) {
+                        if (cctx.kernalContext().isStopping()) {
+                            isCancelled.set(true);
+
+                            return; // Node is stopped before affinity has prepared.
+                        }
+
                         throw new IgniteException("Failed to wait for initialization topology [err="
                             + ex.getMessage() + ']', ex);
                     }
@@ -214,13 +220,13 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
             }
             catch (Throwable t) {
                 if (X.hasCause(t, NodeStoppingException.class)) {
-                    isCancelled = true; // Treat node stopping as valid worker cancellation.
+                    isCancelled.set(true); // Treat node stopping as valid worker cancellation.
 
                     return;
                 }
 
                 if (!(t instanceof IgniteInterruptedCheckedException || t instanceof InterruptedException)) {
-                    if (isCancelled)
+                    if (isCancelled.get())
                         return;
 
                     err = t;
@@ -229,7 +235,7 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
                 throw t;
             }
             finally {
-                if (err == null && !isCancelled)
+                if (err == null && !isCancelled.get())
                     err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly");
 
                 if (err instanceof OutOfMemoryError)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
index c412a5dca2b..b994c4b7d06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
@@ -458,7 +458,7 @@ class BinaryMetadataFileStore {
                     body0();
                 }
                 catch (InterruptedException e) {
-                    if (!isCancelled) {
+                    if (!isCancelled.get()) {
                         ctx.failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, e));
 
                         throw e;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index 051d51779bd..457c668964f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -294,7 +294,7 @@ public class Checkpointer extends GridWorker {
             throw t;
         }
         finally {
-            if (err == null && !(isCancelled))
+            if (err == null && !(isCancelled.get()))
                 err = new IllegalStateException("Thread is terminated unexpectedly: " + name());
 
             if (err instanceof OutOfMemoryError)
@@ -826,7 +826,7 @@ public class Checkpointer extends GridWorker {
         catch (InterruptedException ignored) {
             Thread.currentThread().interrupt();
 
-            isCancelled = true;
+            isCancelled.set(true);
         }
     }
 
@@ -887,7 +887,7 @@ public class Checkpointer extends GridWorker {
             log.debug("Cancelling grid runnable: " + this);
 
         // Do not interrupt runner thread.
-        isCancelled = true;
+        isCancelled.set(true);
 
         synchronized (this) {
             notifyAll();
@@ -915,7 +915,7 @@ public class Checkpointer extends GridWorker {
     public void shutdownNow() {
         shutdownNow = true;
 
-        if (!isCancelled)
+        if (!isCancelled.get())
             cancel();
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index f64b11ac8ea..6711b411a7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1890,7 +1890,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          */
         private void shutdown() throws IgniteInterruptedCheckedException {
             synchronized (this) {
-                isCancelled = true;
+                isCancelled.set(true);
 
                 notifyAll();
             }
@@ -1985,7 +1985,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 Thread.currentThread().interrupt();
 
                 synchronized (this) {
-                    isCancelled = true;
+                    isCancelled.set(true);
                 }
             }
             catch (Throwable t) {
@@ -2141,7 +2141,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         public void restart() {
             assert runner() == null : "FileArchiver is still running";
 
-            isCancelled = false;
+            isCancelled.set(false);
 
             new IgniteThread(archiver).start();
         }
@@ -2249,7 +2249,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         void restart() {
             assert runner() == null : "FileCompressorWorker is still running.";
 
-            isCancelled = false;
+            isCancelled.set(false);
 
             new IgniteThread(this).start();
         }
@@ -2530,7 +2530,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                             U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " +
                                 "[tmp=" + unzipTmp + ", raw=" + unzip + "]", e);
                         }
-                        else if (!isCancelled) {
+                        else if (!isCancelled.get()) {
                             ex = new IgniteCheckedException("Error during WAL segment decompression [segmentIdx=" +
                                 segmentToDecompress + "]", e);
                         }
@@ -2549,14 +2549,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
 
-                if (!isCancelled)
+                if (!isCancelled.get())
                     err = e;
             }
             catch (Throwable t) {
                 err = t;
             }
             finally {
-                if (err == null && !isCancelled)
+                if (err == null && !isCancelled.get())
                     err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
 
                 if (err instanceof OutOfMemoryError)
@@ -2605,7 +2605,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         void restart() {
             assert runner() == null : "FileDecompressor is still running.";
 
-            isCancelled = false;
+            isCancelled.set(false);
 
             new IgniteThread(this).start();
         }
@@ -3292,7 +3292,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             catch (IgniteInterruptedCheckedException e) {
                 Thread.currentThread().interrupt();
 
-                isCancelled = true;
+                isCancelled.set(true);
             }
             catch (Throwable t) {
                 err = t;
@@ -3314,7 +3314,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @throws IgniteInterruptedCheckedException If failed to wait for worker shutdown.
          */
         private void shutdown() throws IgniteInterruptedCheckedException {
-            isCancelled = true;
+            isCancelled.set(true);
 
             U.join(this);
         }
@@ -3325,7 +3325,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         public void restart() {
             assert runner() == null : "FileCleaner is still running";
 
-            isCancelled = false;
+            isCancelled.set(false);
 
             new IgniteThread(this).start();
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
index a836be2fc5c..b901118a34f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -403,7 +403,7 @@ public class FileHandleManagerImpl implements FileHandleManager {
 
                 unparkWaiters(MAX_VALUE);
 
-                if (err == null && !isCancelled)
+                if (err == null && !isCancelled.get())
                     err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
 
                 if (err instanceof OutOfMemoryError)
@@ -583,7 +583,7 @@ public class FileHandleManagerImpl implements FileHandleManager {
         public void restart() {
             assert runner() == null : "WALWriter is still running.";
 
-            isCancelled = false;
+            isCancelled.set(false);
 
             new IgniteThread(this).start();
         }
@@ -640,7 +640,7 @@ public class FileHandleManagerImpl implements FileHandleManager {
         public void restart() {
             assert runner() == null : "WalSegmentSyncer is running.";
 
-            isCancelled = false;
+            isCancelled.set(false);
 
             new IgniteThread(walSegmentSyncWorker).start();
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
index 5316f627c99..5bae8fb052f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageL
 import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.ACTUALIZE;
 import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.CLUSTER_WIDE_UPDATE;
@@ -124,7 +125,7 @@ public class DistributedConfigurationProcessor extends GridProcessorAdapter impl
      * @param propKey Key of specific property.
      * @return Property key for meta storage.
      */
-    private static String toMetaStorageKey(String propKey) {
+    public static String toMetaStorageKey(String propKey) {
         return DIST_CONF_PREFIX + propKey;
     }
 
@@ -169,7 +170,7 @@ public class DistributedConfigurationProcessor extends GridProcessorAdapter impl
      * @param name Property name.
      * @return Public property.
      */
-    public DistributedChangeableProperty<Serializable> property(String name) {
+    public @Nullable DistributedChangeableProperty<Serializable> property(String name) {
         DistributedChangeableProperty<?> p = props.get(name);
 
         if (!(p instanceof DistributedChangeableProperty))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 122b2ea66e7..71e62477676 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
@@ -87,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
@@ -113,9 +115,11 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
 import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
 import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
 import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty.detachedLongProperty;
 import static org.apache.ignite.internal.processors.metric.GridMetricManager.CPU_LOAD;
 import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
 import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
@@ -166,6 +170,12 @@ public class GridJobProcessor extends GridProcessorAdapter {
     /** Total jobs waiting time metric name. */
     public static final String WAITING_TIME = "WaitingTime";
 
+    /**
+     * Distributed property that defines the timeout for interrupting the
+     * {@link GridJobWorker worker} after {@link GridJobWorker#cancel() cancellation} in mills.
+     */
+    public static final String COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT = "computeJobWorkerInterruptTimeout";
+
     /** */
     private final Marshaller marsh;
 
@@ -312,6 +322,10 @@ public class GridJobProcessor extends GridProcessorAdapter {
      */
     @Nullable private final String jobPriAttrKey;
 
+    /** Timeout interrupt {@link GridJobWorker workers} after {@link GridJobWorker#cancel cancel} im mills. */
+    private final DistributedLongProperty computeJobWorkerInterruptTimeout =
+        detachedLongProperty(COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT);
+
     /**
      * @param ctx Kernal context.
      */
@@ -378,6 +392,15 @@ public class GridJobProcessor extends GridProcessorAdapter {
             taskPriAttrKey = null;
             jobPriAttrKey = null;
         }
+
+        ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> {
+            computeJobWorkerInterruptTimeout.addListener(makeUpdateListener(
+                "Compute job parameter '%s' was changed from '%s' to '%s'",
+                log
+            ));
+
+            dispatcher.registerProperty(computeJobWorkerInterruptTimeout);
+        });
     }
 
     /** {@inheritDoc} */
@@ -878,7 +901,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      * In most cases this method should be called from main read lock
      * to avoid jobs activation after node stop has started.
      */
-    private void handleCollisions() {
+    public void handleCollisions() {
         assert !jobAlwaysActivate;
 
         if (handlingCollision.get()) {
@@ -1327,7 +1350,9 @@ public class GridJobProcessor extends GridProcessorAdapter {
                         holdLsnr,
                         partsReservation,
                         req.getTopVer(),
-                        req.executorName());
+                        req.executorName(),
+                        this::computeJobWorkerInterruptTimeout
+                    );
 
                     jobCtx.job(job);
 
@@ -2414,4 +2439,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
         else
             return w -> sesId.equals(w.getSession().getId()) && jobId.equals(w.getJobId());
     }
+
+    /**
+     * @return Interruption timeout of {@link GridJobWorker workers} (in millis) after {@link GridWorker#cancel cancel} is called.
+     */
+    public long computeJobWorkerInterruptTimeout() {
+        return computeJobWorkerInterruptTimeout.getOrDefault(ctx.config().getFailureDetectionTimeout());
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 0495c6ef8ec..e7b0caea536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -92,11 +93,7 @@ import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.SUS
  */
 public class GridJobWorker extends GridWorker implements GridTimeoutObject {
     /** Per-thread held flag. */
-    private static final ThreadLocal<Boolean> HOLD = new ThreadLocal<Boolean>() {
-        @Override protected Boolean initialValue() {
-            return false;
-        }
-    };
+    private static final ThreadLocal<Boolean> HOLD = ThreadLocal.withInitial(() -> false);
 
     /** Static logger to avoid re-creation. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -189,6 +186,13 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
     private volatile ComputeJobStatusEnum status = QUEUED;
 
     /**
+     * Supplier of timeout interrupt {@link GridJobWorker workers} after {@link GridJobWorker#cancel cancel} im mills.
+     */
+    private final LongSupplier jobInterruptTimeoutSupplier;
+
+    /**
+     * Constructor.
+     *
      * @param ctx Kernal context.
      * @param dep Grid deployment.
      * @param createTime Create time.
@@ -203,6 +207,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
      * @param partsReservation Reserved partitions (must be released at the job finish).
      * @param reqTopVer Affinity topology version of the job request.
      * @param execName Custom executor name.
+     * @param jobInterruptTimeoutSupplier Supplier of timeout interrupt
+     *      {@link GridJobWorker workers} after {@link GridJobWorker#cancel cancel} im mills.
      */
     GridJobWorker(
         GridKernalContext ctx,
@@ -218,10 +224,11 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
         GridJobHoldListener holdLsnr,
         GridReservable partsReservation,
         AffinityTopologyVersion reqTopVer,
-        String execName) {
+        String execName,
+        LongSupplier jobInterruptTimeoutSupplier
+    ) {
         super(ctx.igniteInstanceName(), "grid-job-worker", ctx.log(GridJobWorker.class));
 
-        assert ctx != null;
         assert ses != null;
         assert jobCtx != null;
         assert taskNode != null;
@@ -242,6 +249,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
         this.partsReservation = partsReservation;
         this.reqTopVer = reqTopVer;
         this.execName = execName;
+        this.jobInterruptTimeoutSupplier = jobInterruptTimeoutSupplier;
 
         if (job != null)
             this.job = job;
@@ -1160,6 +1168,64 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
         return jobId.hashCode();
     }
 
+    /** {@inheritDoc} */
+    @Override protected void onCancel(boolean firstCancelRequest) {
+        if (firstCancelRequest)
+            handleCancel();
+        else {
+            if (log.isDebugEnabled()) {
+                Thread runner = runner();
+
+                log.debug(String.format(
+                    "Worker cancellation is ignored [jobId=%s, interrupted=%s]",
+                    getJobId(),
+                    runner == null ? "unknown" : runner.isInterrupted()
+                ));
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onCancelledBeforeWorkerScheduled() {
+        // To ensure that the worker does not hang if the cancellation was before the start of its execution.
+        handleCancel();
+    }
+
+    /**
+     * Handles the cancellation of the worker.
+     */
+    private void handleCancel() {
+        long timeout = jobInterruptTimeoutSupplier.getAsLong();
+
+        if (timeout > 0) {
+            ctx.timeout().addTimeoutObject(
+                new JobWorkerInterruptionTimeoutObject(this, U.currentTimeMillis() + timeout)
+            );
+
+            if (log.isDebugEnabled()) {
+                log.debug(String.format(
+                    "Worker will be interrupted later [jobId=%s, timeout=%s]",
+                    getJobId(),
+                    U.humanReadableDuration(timeout)
+                ));
+            }
+        }
+        else {
+            Thread runner = runner();
+
+            if (runner != null)
+                runner.interrupt();
+
+            if (log.isDebugEnabled()) {
+                log.debug(String.format(
+                    "Worker is interrupted on cancel [jobId=%s, interrupted=%s]",
+                    getJobId(),
+                    runner == null ? "unknown" : runner.isInterrupted()
+                ));
+            }
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridJobWorker.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/JobWorkerInterruptionTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/JobWorkerInterruptionTimeoutObject.java
new file mode 100644
index 00000000000..3e5b37bf69e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/JobWorkerInterruptionTimeoutObject.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.job;
+
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Timeout object for delayed {@link GridJobWorker worker} interruption.
+ *
+ * <p>After calling {@link GridJobWorker#cancel} the worker should try to complete gracefully,
+ * if it doesn't then it will {@link Thread#interrupt interrupt} after some time.
+ */
+public class JobWorkerInterruptionTimeoutObject implements GridTimeoutObject {
+    /** Compute job worker. */
+    private final GridJobWorker jobWorker;
+
+    /** ID of timeout object. */
+    private final IgniteUuid id;
+
+    /** Time when the timeout object should be executed in mills. */
+    private final long endTime;
+
+    /**
+     * Constructor.
+     *
+     * @param jobWorker Compute job worker.
+     * @param endTime Time when the timeout object should be executed in mills.
+     */
+    public JobWorkerInterruptionTimeoutObject(
+        GridJobWorker jobWorker,
+        long endTime
+    ) {
+        this.jobWorker = jobWorker;
+        this.endTime = endTime;
+
+        id = IgniteUuid.randomUuid();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid timeoutId() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return endTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        assert jobWorker.isCancelled() : jobWorker;
+
+        Thread runner = jobWorker.runner();
+
+        if (runner != null && !jobWorker.isDone())
+            runner.interrupt();
+    }
+
+    /**
+     * @return Compute job worker.
+     */
+    public GridJobWorker jobWorker() {
+        return jobWorker;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(JobWorkerInterruptionTimeoutObject.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
index ebc5c73bc5b..bb973857e8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
@@ -100,7 +100,7 @@ public class DmsDataWriterWorker extends GridWorker {
 
     /** Start new distributed metastorage worker thread. */
     public void start() {
-        isCancelled = false;
+        isCancelled.set(false);
 
         new IgniteThread(igniteInstanceName(), "dms-writer-thread", this).start();
     }
@@ -187,7 +187,7 @@ public class DmsDataWriterWorker extends GridWorker {
         updateQueue.offer(new FutureTask<>(() -> STOP));
         latch.countDown();
 
-        isCancelled = true;
+        isCancelled.set(true);
 
         Thread runner = runner();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java
index 692bad5aa1d..7f7b730126a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java
@@ -26,15 +26,15 @@ public interface GridTimeoutObject {
     /**
      * @return ID of the object.
      */
-    public IgniteUuid timeoutId();
+    IgniteUuid timeoutId();
 
     /**
      * @return End time.
      */
-    public long endTime();
+    long endTime();
 
     /**
      * Timeout callback.
      */
-    public void onTimeout();
+    void onTimeout();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 7efcea9c8a8..f0b3d3b94ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -296,7 +296,7 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
                 throw t;
             }
             finally {
-                if (err == null && !isCancelled)
+                if (err == null && !isCancelled.get())
                     err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly.");
 
                 if (err instanceof OutOfMemoryError)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 50d9b9ef399..bff78e8a3d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -658,7 +658,7 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
                 }
             }
 
-            if (!isCancelled) {
+            if (!isCancelled.get()) {
                 errHnd.apply(new IllegalStateException("Thread " + Thread.currentThread().getName() +
                     " is terminated unexpectedly"));
             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 50ff443a9d3..2f3e8e55295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2276,7 +2276,7 @@ public class GridNioServer<T> {
                         }
 
                         // select() call above doesn't throw on interruption; checking it here to propagate timely.
-                        if (!closed && !isCancelled && Thread.interrupted())
+                        if (!closed && !isCancelled.get() && Thread.interrupted())
                             throw new InterruptedException();
                     }
                     finally {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
index 615d5062a94..85959950f33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.worker;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
@@ -49,7 +50,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
     private volatile boolean finished;
 
     /** Whether or not this runnable is cancelled. */
-    protected volatile boolean isCancelled;
+    protected final AtomicBoolean isCancelled = new AtomicBoolean();
 
     /** Actual thread runner. */
     private volatile Thread runner;
@@ -114,9 +115,8 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
             log.debug("Grid runnable started: " + name);
 
         try {
-            // Special case, when task gets cancelled before it got scheduled.
-            if (isCancelled)
-                runner.interrupt();
+            if (isCancelled.get())
+                onCancelledBeforeWorkerScheduled();
 
             // Listener callback.
             if (lsnr != null)
@@ -160,7 +160,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
                 lsnr.onStopped(this);
 
             if (log.isDebugEnabled())
-                if (isCancelled)
+                if (isCancelled.get())
                     log.debug("Grid runnable finished due to cancellation: " + name);
                 else if (runner.isInterrupted())
                     log.debug("Grid runnable finished due to interruption without cancellation: " + name);
@@ -191,9 +191,9 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
     }
 
     /**
-     * @return Runner thread.
+     * @return Runner thread, {@code null} if the worker has not yet started executing.
      */
-    public Thread runner() {
+    public @Nullable Thread runner() {
         return runner;
     }
 
@@ -216,20 +216,13 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
     }
 
     /**
-     * Cancels this runnable interrupting actual runner.
+     * Cancels this runnable.
      */
     public void cancel() {
         if (log.isDebugEnabled())
             log.debug("Cancelling grid runnable: " + this);
 
-        isCancelled = true;
-
-        Thread runner = this.runner;
-
-        // Cannot apply Future.cancel() because if we do, then Future.get() would always
-        // throw CancellationException and we would not be able to wait for task completion.
-        if (runner != null)
-            runner.interrupt();
+        onCancel(isCancelled.compareAndSet(false, true));
     }
 
     /**
@@ -241,7 +234,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
         if (log.isDebugEnabled())
             log.debug("Joining grid runnable: " + this);
 
-        if ((runner == null && isCancelled) || finished)
+        if ((runner == null && isCancelled.get()) || finished)
             return;
 
         synchronized (mux) {
@@ -259,7 +252,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
     public boolean isCancelled() {
         Thread runner = this.runner;
 
-        return isCancelled || (runner != null && runner.isInterrupted());
+        return isCancelled.get() || (runner != null && runner.isInterrupted());
     }
 
     /**
@@ -306,6 +299,31 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
             lsnr.onIdle(this);
     }
 
+    /**
+     * Callback on runner cancellation.
+     *
+     * @param firstCancelRequest Flag indicating that worker cancellation was requested for the first time.
+     */
+    protected void onCancel(boolean firstCancelRequest) {
+        Thread runner = this.runner;
+
+        // Cannot apply Future.cancel() because if we do, then Future.get() would always
+        // throw CancellationException, and we would not be able to wait for task completion.
+        if (runner != null)
+            runner.interrupt();
+    }
+
+    /**
+     * Callback on special case, when task is cancelled before is has been scheduled.
+     */
+    protected void onCancelledBeforeWorkerScheduled() {
+        Thread runner = this.runner;
+
+        assert runner != null : this;
+
+        runner.interrupt();
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         Thread runner = this.runner;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java
index b921106d13c..a11ecd762ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java
@@ -60,6 +60,9 @@ public class GridCancelOnGridStopSelfTest extends GridCommonAbstractTest {
         cancelCall = false;
 
         try (Ignite g = startGrid(1)) {
+            // We change it because compute jobs will go to sleep.
+            assertTrue(computeJobWorkerInterruptTimeout(g).propagate(10L));
+
             cnt = new CountDownLatch(1);
 
             g.compute().executeAsync(CancelledTask.class, null);
@@ -67,7 +70,7 @@ public class GridCancelOnGridStopSelfTest extends GridCommonAbstractTest {
             cnt.await();
         }
 
-        assert cancelCall;
+        assertTrue(cancelCall);
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java
index 1c44ad125b6..564793b93ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java
@@ -43,6 +43,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.junit.Test;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
 /**
  * Cancel unused job test.
  */
@@ -93,31 +96,32 @@ public class GridCancelUnusedJobSelfTest extends GridCommonAbstractTest {
     public void testCancel() throws Exception {
         Ignite ignite = G.ignite(getTestIgniteInstanceName());
 
+        // We change it because compute jobs will go to sleep.
+        assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
+
         ignite.compute().localDeployTask(GridCancelTestTask.class, U.detectClassLoader(GridCancelTestTask.class));
 
         ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridCancelTestTask.class.getName(), null);
 
-        // Wait until jobs begin execution.
-        boolean await = startSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+        assertNotNull(fut);
 
-        assert await : "Jobs did not start.";
+        // Wait until jobs begin execution.
+        assertTrue("Jobs did not start.", startSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS));
 
         info("Test task result: " + fut);
 
-        assert fut != null;
-
         // Only first job should successfully complete.
-        Object res = fut.get();
-        assert (Integer)res == 1;
+        assertThat(fut.get(getTestTimeout()), equalTo(1));
 
         // Wait for all jobs to finish.
-        await = stopSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS);
-        assert await : "Jobs did not stop.";
+        assertTrue("Jobs did not stop.", stopSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS));
 
         // One is definitely processed. But there might be some more processed or cancelled or processed and cancelled.
         // Thus total number should be at least SPLIT_COUNT and at most (SPLIT_COUNT - 1) *2 +1
-        assert (cancelCnt + processedCnt) >= SPLIT_COUNT && (cancelCnt + processedCnt) <= (SPLIT_COUNT - 1) * 2 + 1 :
-            "Invalid cancel count value: " + cancelCnt;
+        assertTrue(
+            "Invalid cancel count value: " + cancelCnt,
+            (cancelCnt + processedCnt) >= SPLIT_COUNT && (cancelCnt + processedCnt) <= (SPLIT_COUNT - 1) * 2 + 1
+        );
     }
 
     /**
@@ -171,6 +175,8 @@ public class GridCancelUnusedJobSelfTest extends GridCommonAbstractTest {
         private ComputeTaskSession ses;
 
         /**
+         * Constructor.
+         *
          * @param arg Argument.
          */
         private GridCancelTestJob(Integer arg) {
@@ -179,7 +185,7 @@ public class GridCancelUnusedJobSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public Serializable execute() {
-            int arg = this.<Integer>argument(0);
+            int arg = argument(0);
 
             try {
                 if (log.isInfoEnabled())
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java
index f5c7fc9175f..0547a01bc71 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java
@@ -31,7 +31,6 @@ import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeTaskFuture;
 import org.apache.ignite.compute.ComputeTaskSplitAdapter;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.resources.LoggerResource;
@@ -44,11 +43,14 @@ import org.apache.ignite.spi.collision.CollisionJobContext;
 import org.apache.ignite.spi.collision.CollisionSpi;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.junit.Test;
 
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
 /**
  * Cancelled jobs metrics self test.
  */
@@ -85,25 +87,24 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
     public void testCancelledJobs() throws Exception {
         final Ignite ignite = G.ignite(getTestIgniteInstanceName());
 
+        // We change it because compute jobs will go to sleep.
+        assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
+
         Collection<ComputeTaskFuture<?>> futs = new ArrayList<>();
 
         for (int i = 1; i <= 10; i++)
             futs.add(ignite.compute().executeAsync(CancelledTask.class, null));
 
         // Wait to be sure that metrics were updated.
-        GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                return ignite.cluster().localNode().metrics().getTotalCancelledJobs() > 0;
-            }
-        }, 5000);
+        waitForCondition(() -> ignite.cluster().localNode().metrics().getTotalCancelledJobs() > 0, 5000);
 
         colSpi.externalCollision();
 
         for (ComputeTaskFuture<?> fut : futs) {
             try {
-                fut.get();
+                fut.get(getTestTimeout());
 
-                assert false : "Job was not interrupted.";
+                fail("Job was not interrupted.");
             }
             catch (IgniteException e) {
                 if (e.hasCause(InterruptedException.class))
@@ -114,9 +115,7 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
         }
 
         // Job was cancelled and now we need to calculate metrics.
-        int totalCancelledJobs = ignite.cluster().localNode().metrics().getTotalCancelledJobs();
-
-        assert totalCancelledJobs == 10 : "Metrics were not updated. Expected 10 got " + totalCancelledJobs;
+        assertThat(ignite.cluster().localNode().metrics().getTotalCancelledJobs(), equalTo(10));
     }
 
     /**
@@ -130,7 +129,7 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public Object reduce(List<ComputeJobResult> results) {
-            assert results.get(0).isCancelled() : "Wrong job result status.";
+            assertFalse(results.get(0).isCancelled());
 
             return null;
         }
@@ -148,13 +147,6 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
                 Thread.sleep(Long.MAX_VALUE);
             }
             catch (InterruptedException ignored) {
-                try {
-                    Thread.sleep(1000);
-                }
-                catch (InterruptedException e1) {
-                    throw new IgniteException("Unexpected exception: ", e1);
-                }
-
                 throw new IgniteException("Job got interrupted while waiting for cancellation.");
             }
             finally {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java
index 73e53bf15af..fc8371bf081 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java
@@ -43,6 +43,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
 /**
  * Test failover and custom topology. Topology returns local node if remote node fails.
  */
@@ -87,11 +90,10 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
     @Test
     public void testFailoverTopology() throws Exception {
         try {
-            Ignite ignite1 = startGrid(1);
-            Ignite ignite2 = startGrid(2);
+            Ignite ignite1 = startGrids(2);
 
-            assert ignite1 != null;
-            assert ignite2 != null;
+            // We change it because compute jobs will go to sleep.
+            assertTrue(computeJobWorkerInterruptTimeout(ignite1).propagate(10L));
 
             ignite1.compute().localDeployTask(JobTask.class, JobTask.class.getClassLoader());
 
@@ -104,9 +106,9 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
                     mux.wait();
                 }
 
-                stopAndCancelGrid(2);
+                stopAndCancelGrid(1);
 
-                String res = fut.get();
+                String res = fut.get(getTestTimeout());
 
                 info("Task result: " + res);
             }
@@ -116,13 +118,10 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
 
             info("Failed over: " + failCnt.get());
 
-            assert failCnt.get() == 1 : "Invalid fail over counter [expected=1, actual=" + failCnt.get() + ']';
+            assertThat(failCnt.get(), equalTo(1));
         }
         finally {
-            stopGrid(1);
-
-            // Stopping stopped instance just in case.
-            stopGrid(2);
+            stopAllGrids();
         }
     }
 
@@ -139,11 +138,11 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) {
-            assert ignite != null;
+            assertNotNull(ignite);
 
             UUID locNodeId = ignite.configuration().getNodeId();
 
-            assert locNodeId != null;
+            assertNotNull(locNodeId);
 
             if (log.isInfoEnabled())
                 log.info("Mapping jobs [subgrid=" + subgrid + ", arg=" + arg + ']');
@@ -167,7 +166,7 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
 
                     UUID nodeId = ignite.configuration().getNodeId();
 
-                    assert nodeId != null;
+                    assertNotNull(nodeId);
 
                     if (!nodeId.equals(argument(0))) {
                         try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
index be451625462..2ede9cbb4fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
@@ -73,6 +73,9 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
         stopAllGrids();
 
         ignite = startGridsMultiThreaded(2);
+
+        // We are changing it because compute jobs fall asleep.
+        assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java
index 522ed7786a8..9fc28dbca87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java
@@ -71,6 +71,9 @@ public class GridStopWithCancelSelfTest extends GridCommonAbstractTest {
         try {
             Ignite ignite = startGrid("testGrid");
 
+            // We are changing it because compute jobs fall asleep.
+            assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
+
             executeAsync(ignite.compute(), CancelledTask.class, null);
 
             cnt.await();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java
index 5f9d9bd5669..606b5d26ff5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java
@@ -40,6 +40,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.junit.Test;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.startsWith;
+
 /**
  * Test for task future when grid stops.
  */
@@ -72,17 +75,17 @@ public class GridTaskFutureImplStopGridSelfTest extends GridCommonAbstractTest {
     public void testGet() throws Exception {
         Ignite ignite = startGrid(getTestIgniteInstanceName());
 
+        // We change it because compute jobs fall asleep.
+        assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
+
         Thread futThread = null;
 
         try {
             final ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridStopTestTask.class.getName(), null);
 
-            fut.listen(new CI1<IgniteFuture>() {
-                @SuppressWarnings({"NakedNotify"})
-                @Override public void apply(IgniteFuture gridFut) {
-                    synchronized (mux) {
-                        mux.notifyAll();
-                    }
+            fut.listen((CI1<IgniteFuture>)gridFut -> {
+                synchronized (mux) {
+                    mux.notifyAll();
                 }
             });
 
@@ -104,7 +107,7 @@ public class GridTaskFutureImplStopGridSelfTest extends GridCommonAbstractTest {
                         failed.set(true);
 
                         // Make sure that message contains info about stopping grid.
-                        assert e.getMessage().startsWith("Task failed due to stopping of the grid:");
+                        assertThat(e.getMessage(), startsWith("Task failed due to stopping of the grid:"));
                     }
                     finally {
                         latch.countDown();
@@ -135,7 +138,7 @@ public class GridTaskFutureImplStopGridSelfTest extends GridCommonAbstractTest {
 
             info("Test task result [failed=" + failed.get() + ", taskFuture=" + fut + ']');
 
-            assert finished : "Future thread was not stopped.";
+            assertTrue(finished);
 
             assert fut.isDone();
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java
index f2475eab96a..6bd6daf901f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java
@@ -97,6 +97,9 @@ public class SystemViewComputeJobTest extends GridCommonAbstractTest {
         cache = server.createCache("test-cache");
 
         cache.put(1, 1);
+
+        // We are changing it because compute jobs fall asleep.
+        assertTrue(computeJobWorkerInterruptTimeout(server).propagate(10L));
     }
 
     /** Tests work of {@link SystemView} for compute grid {@link IgniteCompute#broadcastAsync(IgniteRunnable)} call. */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
index 17c783dc98a..e6ece4a4b3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
@@ -91,6 +91,9 @@ public class ComputeJobStatusTest extends GridCommonAbstractTest {
 
         node0 = crd;
         node1 = grid(1);
+
+        // We are changing it because compute jobs fall asleep.
+        assertTrue(computeJobWorkerInterruptTimeout(node0).propagate(10L));
     }
 
     /** {@inheritDoc} */
@@ -258,6 +261,9 @@ public class ComputeJobStatusTest extends GridCommonAbstractTest {
             U.sleep(100);
 
             checkTaskJobStatuses(sesId, FINISHED, null);
+
+            // Let's wait a bit for the callcc (above) to complete.
+            U.sleep(100);
         }
 
         // Let's check that the job (WaitJob) on the node0 has finished
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java
new file mode 100644
index 00000000000..0bd42eae51f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobSibling;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
+import org.apache.ignite.internal.processors.job.GridJobWorker;
+import org.apache.ignite.internal.processors.job.JobWorkerInterruptionTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.toMetaStorageKey;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+/**
+ * {@link GridJobWorker} interrupt testing.
+ */
+public class InterruptComputeJobTest extends GridCommonAbstractTest {
+    /** Node. */
+    private static IgniteEx node;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        stopAllGrids();
+
+        node = startGrid();
+
+        node.cluster().state(ACTIVE);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+
+        node = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = true;
+
+        // Reset distributed property.
+        node.context().distributedMetastorage().remove(
+            toMetaStorageKey(computeJobWorkerInterruptTimeout(node).getName())
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        // Cleanup with task release.
+        CountDownLatchJob.JOBS.removeIf(job -> {
+            job.latch.countDown();
+
+            return true;
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName).setCollisionSpi(new PriorityQueueCollisionSpiEx());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+        return new StopNodeFailureHandler();
+    }
+
+    /**
+     * Checks that method {@link GridJobProcessor#computeJobWorkerInterruptTimeout()}
+     * returns a valid value that depends on distributed property "computeJobWorkerInterruptTimeout".
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testComputeJobWorkerInterruptTimeoutProperty() throws Exception {
+        // Check default.
+        assertThat(
+            node.context().job().computeJobWorkerInterruptTimeout(),
+            equalTo(node.context().config().getFailureDetectionTimeout())
+        );
+
+        // Check update value.
+        computeJobWorkerInterruptTimeout(node).propagate(100500L);
+
+        assertThat(node.context().job().computeJobWorkerInterruptTimeout(), equalTo(100500L));
+    }
+
+    /**
+     * Checks that when {@link GridJobWorker#cancel()} (even twice) is called, the {@link GridJobWorker#runner()}
+     * is not interrupted and that only one {@link JobWorkerInterruptionTimeoutObject} is created.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancel() throws Exception {
+        computeJobWorkerInterruptTimeout(node).propagate(TimeUnit.HOURS.toMillis(1));
+
+        ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
+
+        GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
+
+        cancelWitchChecks(jobWorker);
+
+        cancelWitchChecks(jobWorker);
+
+        ((CountDownLatchJob)jobWorker.getJob()).latch.countDown();
+
+        taskFut.get(getTestTimeout());
+    }
+
+    /**
+     * Checks that after {@link GridJobWorker#cancel()}, the {@link JobWorkerInterruptionTimeoutObject}
+     * will trigger the {@link Thread#interrupt()}.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testInterrupt() throws Exception {
+        computeJobWorkerInterruptTimeout(node).propagate(100L);
+
+        ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
+
+        GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
+
+        cancelWitchChecks(jobWorker);
+
+        // We are waiting for the GridJobWorkerInterrupter to interrupt the worker.
+        taskFut.get(1_000L);
+
+        assertThat(jobWorker.isCancelled(), equalTo(true));
+        assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(true));
+        assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), empty());
+    }
+
+    /**
+     * Checks that if the worker was {@link GridJobWorker#cancel()} (even twice) before starting work,
+     * then it will be canceled, not interrupted, and have one {@link JobWorkerInterruptionTimeoutObject} before
+     * and two after the start.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelBeforeStart() throws Exception {
+        PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = false;
+
+        computeJobWorkerInterruptTimeout(node).propagate(TimeUnit.HOURS.toMillis(1));
+
+        ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
+
+        GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
+
+        cancelBeforeStartWitchChecks(jobWorker);
+
+        cancelBeforeStartWitchChecks(jobWorker);
+
+        PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = true;
+
+        node.context().job().handleCollisions();
+
+        assertTrue(waitForCondition(jobWorker::isStarted, getTestTimeout(), 10));
+
+        assertThat(jobWorker.isCancelled(), equalTo(true));
+        assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(false));
+        assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(2));
+    }
+
+    /**
+     * @param n Node.
+     * @param taskSession Task session.
+     * @return Job worker is expected to be the only one and either active or passive.
+     */
+    private static GridJobWorker jobWorker(IgniteEx n, ComputeTaskSession taskSession) {
+        Collection<ComputeJobSibling> siblings = taskSession.getJobSiblings();
+
+        assertThat(siblings, hasSize(1));
+
+        IgniteUuid jobId = F.first(siblings).getJobId();
+
+        GridJobWorker jobWorker = n.context().job().activeJob(jobId);
+
+        if (jobWorker == null) {
+            Map<IgniteUuid, GridJobWorker> passiveJobs = getFieldValue(n.context().job(), "passiveJobs");
+
+            if (passiveJobs != null)
+                jobWorker = passiveJobs.get(jobId);
+        }
+
+        assertThat(jobWorker, notNullValue());
+
+        return jobWorker;
+    }
+
+    /**
+     * Cancels the worker, checking that it is canceled and not interrupted and only one interrupter is added.
+     *
+     * @param jobWorker Compute job worker.
+     * @throws Exception If failed.
+     */
+    private void cancelWitchChecks(GridJobWorker jobWorker) throws Exception {
+        assertTrue(waitForCondition(jobWorker::isStarted, getTestTimeout(), 10));
+
+        jobWorker.cancel();
+
+        assertThat(jobWorker.isCancelled(), equalTo(true));
+        assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(false));
+        assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(1));
+    }
+
+    /**
+     * Cancels the worker before it starts, checks that it is canceled, and creates one interrupter.
+     *
+     * @param jobWorker Compute job worker.
+     */
+    private void cancelBeforeStartWitchChecks(GridJobWorker jobWorker) {
+        jobWorker.cancel();
+
+        assertThat(jobWorker.isStarted(), equalTo(false));
+        assertThat(jobWorker.runner(), nullValue());
+
+        assertThat(jobWorker.isCancelled(), equalTo(true));
+        assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(1));
+    }
+
+    /**
+     * @param n Node.
+     * @return Value of {@code GridTimeoutProcessor#timeoutObjs}.
+     */
+    private static GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjects(IgniteEx n) {
+        return getFieldValue(n.context().timeout(), "timeoutObjs");
+    }
+
+    /**
+     * @param timeoutObjects Value of {@code GridTimeoutProcessor#timeoutObjs}.
+     * @param jobWorker Compute job worker.
+     * @return Collection of {@link JobWorkerInterruptionTimeoutObject} for {@code jobWorker}.
+     */
+    private static Collection<JobWorkerInterruptionTimeoutObject> jobWorkerInterrupters(
+        GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjects,
+        GridJobWorker jobWorker
+    ) {
+        return timeoutObjects.stream()
+            .filter(JobWorkerInterruptionTimeoutObject.class::isInstance)
+            .map(JobWorkerInterruptionTimeoutObject.class::cast)
+            .filter(o -> o.jobWorker() == jobWorker)
+            .collect(toList());
+    }
+
+    /**
+     * @return Value of {@link CountDownLatchJob#interrupted}.
+     */
+    private static boolean countDownLatchJobInterrupted(GridJobWorker jobWorker) {
+        return ((CountDownLatchJob)jobWorker.getJob()).interrupted;
+    }
+
+    /**
+     * Test extension {@link PriorityQueueCollisionSpi}.
+     */
+    private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
+        /** Collision handling flag. */
+        volatile boolean handleCollision = true;
+
+        /** {@inheritDoc} */
+        @Override public void onCollision(CollisionContext ctx) {
+            if (handleCollision)
+                super.onCollision(ctx);
+        }
+
+        /**
+         * @param n Node.
+         * @return Test extension {@link PriorityQueueCollisionSpi}.
+         */
+        static PriorityQueueCollisionSpiEx collisionSpiEx(IgniteEx n) {
+            return ((PriorityQueueCollisionSpiEx)n.configuration().getCollisionSpi());
+        }
+    }
+
+    /**
+     * Task that creates jobs.
+     */
+    private static class ComputeTask extends ComputeTaskAdapter<Object, Void> {
+        /** Compute job class. */
+        final Class<? extends ComputeJobAdapter> jobClass;
+
+        /**
+         * Constructor.
+         *
+         * @param jobClass Compute job class.
+         */
+        ComputeTask(Class<? extends ComputeJobAdapter> jobClass) {
+            this.jobClass = jobClass;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            @Nullable Object arg
+        ) throws IgniteException {
+            return subgrid.stream().collect(toMap(n -> newComputeJobInstance(arg), identity()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+            return null;
+        }
+
+        /**
+         * @param arg Argument.
+         * @return New instance of {@link #jobClass}.
+         */
+        private ComputeJobAdapter newComputeJobInstance(@Nullable Object arg) {
+            try {
+                if (arg == null)
+                    return jobClass.newInstance();
+
+                return jobClass.getDeclaredConstructor(arg.getClass()).newInstance(arg);
+            }
+            catch (Exception e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+
+    /**
+     * A job that is waiting for the latch counter to decrement in order to complete its work.
+     */
+    private static class CountDownLatchJob extends ComputeJobAdapter {
+        /** All jobs. */
+        static final Collection<CountDownLatchJob> JOBS = new ConcurrentLinkedQueue<>();
+
+        /** Latch. */
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        /** Interrupted. */
+        volatile boolean interrupted;
+
+        /**
+         * Constructor.
+         */
+        public CountDownLatchJob() {
+            JOBS.add(this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            try {
+                latch.await();
+            }
+            catch (InterruptedException e) {
+                interrupted = true;
+
+                Thread.currentThread().interrupt();
+            }
+
+            return null;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java
index 45d4951a763..f9cf4fc9a1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java
@@ -47,6 +47,10 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.junit.Test;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+
 /**
  * Test of session siblings cancellation from future.
  */
@@ -73,7 +77,7 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
     /**
      *
      */
-    public GridSessionCancelSiblingsFromFutureSelfTest() {
+    public GridSessionCancelSiblingsFromFutureSelfTest() throws Exception {
         super(true);
     }
 
@@ -92,6 +96,14 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
         return c;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeFirstTest() throws Exception {
+        super.beforeFirstTest();
+
+        // We are changing it because compute jobs fall asleep.
+        assertTrue(computeJobWorkerInterruptTimeout(G.ignite(getTestIgniteInstanceName())).propagate(10L));
+    }
+
     /**
      * @throws Exception if failed
      */
@@ -114,18 +126,16 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
 
         final AtomicBoolean failed = new AtomicBoolean(false);
 
-        GridTestUtils.runMultiThreaded(new Runnable() {
-            @Override public void run() {
-                int num = sNum.get();
+        GridTestUtils.runMultiThreaded(() -> {
+            int num = sNum.get();
 
-                try {
-                    checkTask(num);
-                }
-                catch (Throwable e) {
-                    error("Failed to execute task.", e);
+            try {
+                checkTask(num);
+            }
+            catch (Throwable e) {
+                error("Failed to execute task.", e);
 
-                    failed.set(true);
-                }
+                failed.set(true);
             }
         }, EXEC_COUNT, "grid-session-test");
 
@@ -143,13 +153,11 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
 
         ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridTaskSessionTestTask.class, num);
 
-        assert fut != null;
+        assertNotNull(fut);
 
         try {
             // Wait until jobs begin execution.
-            boolean await = startSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
-
-            assert await : "Jobs did not start.";
+            assertTrue("Jobs did not start.", startSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
 
             Collection<ComputeJobSibling> jobSiblings = fut.getTaskSession().getJobSiblings();
 
@@ -158,23 +166,19 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
                 jobSibling.cancel();
             }
 
-            Object res = fut.get();
+            Object res = fut.get(getTestTimeout());
 
-            assert "interrupt-task-data".equals(res) : "Invalid task result: " + res;
+            assertThat(res, equalTo("interrupt-task-data"));
 
             // Wait for all jobs to finish.
-            await = stopSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
-
-            assert await : "Jobs did not cancel.";
-
-            int cnt = interruptCnt[num].get();
+            assertTrue("Jobs did not cancel.", stopSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
 
-            assert cnt == SPLIT_COUNT : "Invalid interrupt count value: " + cnt;
+            assertThat(interruptCnt[num].get(), equalTo(SPLIT_COUNT));
         }
         finally {
             // We must wait for the jobs to be sure that they have completed
             // their execution since they use static variable (shared for the tests).
-            fut.get();
+            fut.get(getTestTimeout());
         }
     }
 
@@ -213,11 +217,11 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
             if (log.isInfoEnabled())
                 log.info("Splitting jobs [task=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']');
 
-            assert arg != null;
+            assertNotNull(arg);
 
             taskNum = (Integer)arg;
 
-            assert taskNum != -1;
+            assertThat(taskNum, not(equalTo(-1)));
 
             Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT);
 
@@ -226,7 +230,7 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
                     private volatile Thread thread;
 
                     @Override public Serializable execute() {
-                        assert taskSes != null;
+                        assertNotNull(taskSes);
 
                         thread = Thread.currentThread();
 
@@ -255,7 +259,7 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
                     }
 
                     @Override public void cancel() {
-                        assert thread != null;
+                        assertNotNull(thread);
 
                         interruptCnt[taskNum].incrementAndGet();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java
index 5b24d3a7061..393a1d109b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java
@@ -51,6 +51,10 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.junit.Test;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+
 /**
  * Session cancellation tests.
  */
@@ -94,6 +98,14 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
         return c;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeFirstTest() throws Exception {
+        super.beforeFirstTest();
+
+        // We are changing it because compute jobs fall asleep.
+        assertTrue(computeJobWorkerInterruptTimeout(G.ignite(getTestIgniteInstanceName())).propagate(10L));
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -116,18 +128,16 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
 
         final AtomicBoolean failed = new AtomicBoolean(false);
 
-        GridTestUtils.runMultiThreaded(new Runnable() {
-            @Override public void run() {
-                int num = sNum.get();
+        GridTestUtils.runMultiThreaded(() -> {
+            int num = sNum.get();
 
-                try {
-                    checkTask(num);
-                }
-                catch (Throwable e) {
-                    error("Failed to execute task.", e);
+            try {
+                checkTask(num);
+            }
+            catch (Throwable e) {
+                error("Failed to execute task.", e);
 
-                    failed.set(true);
-                }
+                failed.set(true);
             }
         }, EXEC_COUNT, "grid-session-test");
 
@@ -145,26 +155,22 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
 
         ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridTaskSessionTestTask.class, num);
 
-        boolean await = startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
+        assertNotNull(fut);
 
         // Wait until jobs begin execution.
-        assert await : "Jobs did not start.";
-
-        assert fut != null;
+        assertTrue("Jobs did not start.", startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
 
-        Object res = fut.get();
+        Object res = fut.get(getTestTimeout());
 
-        assert "interrupt-task-data".equals(res) : "Invalid task result: " + res;
-
-        await = stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
+        assertThat(res, equalTo("interrupt-task-data"));
 
         // Wait for all jobs to finish.
-        assert await :
-            "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']';
-
-        int cnt = interruptCnt[num].get();
+        assertTrue(
+            "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']',
+            stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS)
+        );
 
-        assert cnt == SPLIT_COUNT - 1 : "Invalid interrupt count value: " + cnt;
+        assertThat(interruptCnt[num].get(), equalTo(SPLIT_COUNT - 1));
     }
 
     /** */
@@ -203,11 +209,11 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
             if (log.isInfoEnabled())
                 log.info("Splitting job [task=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']');
 
-            assert arg != null;
+            assertNotNull(arg);
 
             taskNum = (Integer)arg;
 
-            assert taskNum != -1;
+            assertThat(taskNum, not(equalTo(-1)));
 
             Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT);
 
@@ -222,7 +228,7 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
 
                     /** {@inheritDoc} */
                     @Override public Object execute() {
-                        assert taskSes != null;
+                        assertNotNull(taskSes);
 
                         thread = Thread.currentThread();
 
@@ -241,7 +247,7 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
                                 if (log.isInfoEnabled())
                                     log.info("Job one is proceeding [jobId=" + jobId + ']');
 
-                                assert jobId != null;
+                                assertNotNull(jobId);
 
                                 Collection<ComputeJobSibling> jobSiblings = taskSes.getJobSiblings();
 
@@ -270,7 +276,7 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
 
                     /** {@inheritDoc} */
                     @Override public void cancel() {
-                        assert thread != null;
+                        assertNotNull(thread);
 
                         interruptCnt[taskNum].incrementAndGet();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java
index abb9dac732e..6f41ea29be8 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java
@@ -49,6 +49,10 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.junit.Test;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+
 /**
  * Session cancellation tests.
  */
@@ -92,6 +96,14 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
         return c;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeFirstTest() throws Exception {
+        super.beforeFirstTest();
+
+        // We are changing it because compute jobs fall asleep.
+        assertTrue(computeJobWorkerInterruptTimeout(G.ignite(getTestIgniteInstanceName())).propagate(10L));
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -114,18 +126,16 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
 
         final AtomicBoolean failed = new AtomicBoolean(false);
 
-        GridTestUtils.runMultiThreaded(new Runnable() {
-            @Override public void run() {
-                int num = sNum.get();
+        GridTestUtils.runMultiThreaded(() -> {
+            int num = sNum.get();
 
-                try {
-                    checkTask(num);
-                }
-                catch (Throwable e) {
-                    error("Failed to execute task.", e);
+            try {
+                checkTask(num);
+            }
+            catch (Throwable e) {
+                error("Failed to execute task.", e);
 
-                    failed.set(true);
-                }
+                failed.set(true);
             }
         }, EXEC_COUNT, "grid-session-test");
 
@@ -143,32 +153,28 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
 
         ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridTaskSessionTestTask.class, num);
 
-        assert fut != null;
+        assertNotNull(fut);
 
         try {
             // Wait until jobs begin execution.
-            boolean await = startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
-
-            assert await : "Jobs did not start.";
+            assertTrue("Jobs did not start.", startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
 
-            Object res = fut.get();
+            Object res = fut.get(getTestTimeout());
 
-            assert "interrupt-task-data".equals(res) : "Invalid task result: " + res;
+            assertThat(res, equalTo("interrupt-task-data"));
 
             // Wait for all jobs to finish.
-            await = stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
-
-            assert await :
-                "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']';
-
-            int cnt = interruptCnt[num].get();
+            assertTrue(
+                "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']',
+                stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS)
+            );
 
-            assert cnt == SPLIT_COUNT - 1 : "Invalid interrupt count value: " + cnt;
+            assertThat(interruptCnt[num].get(), equalTo(SPLIT_COUNT - 1));
         }
         finally {
             // We must wait for the jobs to be sure that they have completed
             // their execution since they use static variable (shared for the tests).
-            fut.get();
+            fut.get(getTestTimeout());
         }
     }
 
@@ -208,11 +214,11 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
             if (log.isInfoEnabled())
                 log.info("Splitting job [job=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']');
 
-            assert arg != null;
+            assertNotNull(arg);
 
             taskNum = (Integer)arg;
 
-            assert taskNum != -1;
+            assertThat(taskNum, not(equalTo(-1)));
 
             Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT);
 
@@ -223,7 +229,7 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
 
                     /** {@inheritDoc} */
                     @Override public Serializable execute() {
-                        assert taskSes != null;
+                        assertNotNull(taskSes);
 
                         thread = Thread.currentThread();
 
@@ -260,7 +266,7 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
 
                     /** {@inheritDoc} */
                     @Override public void cancel() {
-                        assert thread != null;
+                        assertNotNull(thread);
 
                         interruptCnt[taskNum].incrementAndGet();
 
@@ -279,7 +285,7 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
 
                 IgniteUuid jobId = received.get(0).getJobContext().getJobId();
 
-                assert jobId != null;
+                assertNotNull(jobId);
 
                 // Cancel all jobs except first job with argument 1.
                 for (ComputeJobSibling jobSibling : jobSiblings) {
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index eff81839ea8..19c11f3796e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1967,25 +1967,62 @@ public final class GridTestUtils {
      * @return {@code true} if condition was achieved, {@code false} otherwise.
      * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
      */
-    public static boolean waitForCondition(GridAbsPredicate cond, long timeout) throws IgniteInterruptedCheckedException {
+    public static boolean waitForCondition(
+        GridAbsPredicate cond,
+        long timeout
+    ) throws IgniteInterruptedCheckedException {
+        return waitForCondition(cond, timeout, DFLT_BUSYWAIT_SLEEP_INTERVAL);
+    }
+
+    /**
+     * Waits for condition, polling in busy wait loop.
+     *
+     * @param cond Condition to wait for.
+     * @param timeout Max time to wait in milliseconds.
+     * @return {@code true} if condition was achieved, {@code false} otherwise.
+     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
+     */
+    public static boolean waitForCondition(
+        GridAbsPredicate cond,
+        long timeout,
+        long checkInterval
+    ) throws IgniteInterruptedCheckedException {
         long endTime = U.currentTimeMillis() + timeout;
         long endTime0 = endTime < 0 ? Long.MAX_VALUE : endTime;
 
-        return waitForCondition(cond, () -> U.currentTimeMillis() < endTime0);
+        return waitForCondition(cond, () -> U.currentTimeMillis() < endTime0, checkInterval);
+    }
+
+    /**
+     * @param cond Condition to wait for.
+     * @param wait Wait predicate.
+     * @return {@code true} if condition was achieved, {@code false} otherwise.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    public static boolean waitForCondition(
+        GridAbsPredicate cond,
+        BooleanSupplier wait
+    ) throws IgniteInterruptedCheckedException {
+        return waitForCondition(cond, wait, DFLT_BUSYWAIT_SLEEP_INTERVAL);
     }
 
     /**
      * @param cond Condition to wait for.
      * @param wait Wait predicate.
      * @return {@code true} if condition was achieved, {@code false} otherwise.
+     * @param checkInterval Time interval between two consecutive condition checks.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    public static boolean waitForCondition(GridAbsPredicate cond, BooleanSupplier wait) throws IgniteInterruptedCheckedException {
+    public static boolean waitForCondition(
+        GridAbsPredicate cond,
+        BooleanSupplier wait,
+        long checkInterval
+    ) throws IgniteInterruptedCheckedException {
         while (wait.getAsBoolean()) {
             if (cond.apply())
                 return true;
 
-            U.sleep(DFLT_BUSYWAIT_SLEEP_INTERVAL);
+            U.sleep(checkInterval);
         }
 
         return false;
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 16004989f20..dfeb9c2c1f9 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -684,7 +684,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
     }
 
     /** */
-    private void beforeFirstTest() throws Exception {
+    protected void beforeFirstTest() throws Exception {
         sharedStaticIpFinder = new TcpDiscoveryVmIpFinder(true);
 
         clsLdr = Thread.currentThread().getContextClassLoader();
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 5a1ab1ddcd8..bbda79b0609 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.testframework.junits.common;
 
+import java.io.Serializable;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -114,6 +115,8 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
 import org.apache.ignite.internal.processors.service.IgniteServiceProcessor;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -157,6 +160,8 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
 
 /**
  * Super class for all common tests.
@@ -2781,4 +2786,17 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
             dbMgr.checkpointReadUnlock();
         }
     }
+
+    /**
+     * @param n Node.
+     * @return Distributed property: {@code GridJobProcessor#computeJobWorkerInterruptTimeout}.
+     */
+    protected DistributedChangeableProperty<Serializable> computeJobWorkerInterruptTimeout(Ignite n) {
+        DistributedChangeableProperty<Serializable> timeoutProperty = ((IgniteEx)n).context().distributedConfiguration()
+            .property(GridJobProcessor.COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT);
+
+        assertThat(timeoutProperty, notNullValue());
+
+        return timeoutProperty;
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
index c88ab8a708d..4c172b398a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
@@ -77,7 +77,7 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr
         @Override public void evaluate() throws Throwable {
             GridSpiAbstractTest testClsInstance = (GridSpiAbstractTest)description.getTestClass().newInstance();
             try {
-                testClsInstance.beforeFirstTest();
+                testClsInstance.beforeFirstTestInternal();
 
                 base.evaluate();
             }
@@ -157,7 +157,7 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr
     }
 
     /** */
-    private void beforeFirstTest() throws Exception {
+    protected void beforeFirstTestInternal() throws Exception {
         if (autoStart) {
             GridSpiTest spiTest = GridTestUtils.getAnnotation(getClass(), GridSpiTest.class);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 8ad91a84bfa..f34166a6a09 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -85,6 +85,7 @@ import org.apache.ignite.internal.processors.compute.ComputeJobStatusTest;
 import org.apache.ignite.internal.processors.compute.ComputeTaskWithWithoutFullSupportTest;
 import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorConfigurationSelfTest;
 import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorSelfTest;
+import org.apache.ignite.internal.processors.compute.InterruptComputeJobTest;
 import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
 import org.apache.ignite.internal.util.StripedExecutorTest;
 import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest;
@@ -181,7 +182,8 @@ import org.junit.runners.Suite;
     ComputeGridMonitorTest.class,
     ComputeJobChangePriorityTest.class,
     ComputeJobStatusTest.class,
-    ComputeTaskWithWithoutFullSupportTest.class
+    ComputeTaskWithWithoutFullSupportTest.class,
+    InterruptComputeJobTest.class
 })
 public class IgniteComputeGridTestSuite {
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
index a70c9ec353f..c8f828bb201 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
@@ -107,6 +107,9 @@ public class KillCommandsMXBeanTest extends GridCommonAbstractTest {
 
         srvs.get(0).cluster().state(ACTIVE);
 
+        // We change to reduce the waiting time for interrupting compute job.
+        computeJobWorkerInterruptTimeout(srvs.get(0)).propagate(100L);
+
         IgniteCache<Object, Object> cache = startCli.getOrCreateCache(
             new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
                 .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
index 7dae6b5af94..65163ddaae1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
@@ -95,6 +95,9 @@ public class KillCommandsSQLTest extends GridCommonAbstractTest {
 
         srvs.get(0).cluster().state(ACTIVE);
 
+        // We change to reduce the waiting time for interrupting compute job.
+        computeJobWorkerInterruptTimeout(srvs.get(0)).propagate(100L);
+
         IgniteCache<Object, Object> cache = startCli.getOrCreateCache(
             new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
                 .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));