You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2022/01/21 14:11:26 UTC

[ignite] branch master updated: IGNITE-16339 Dynamic task reprioritization via changing session priority attribute - Fixes #9752.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 02a322e  IGNITE-16339 Dynamic task reprioritization via changing session priority attribute - Fixes #9752.
02a322e is described below

commit 02a322e18e1e494bb658c1867e61747aa267f852
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Jan 21 15:51:05 2022 +0300

    IGNITE-16339 Dynamic task reprioritization via changing session priority attribute - Fixes #9752.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../ignite/internal/GridTaskSessionImpl.java       |  11 +-
 .../managers/collision/GridCollisionManager.java   |  14 +-
 .../processors/job/GridJobEventListener.java       |  11 +-
 .../internal/processors/job/GridJobProcessor.java  | 104 ++++++++-
 .../internal/processors/job/GridJobWorker.java     |   3 +
 .../processors/task/GridTaskProcessor.java         |  15 +-
 .../priorityqueue/PriorityQueueCollisionSpi.java   |  92 +++-----
 .../GridMultithreadedJobStealingSelfTest.java      |  76 +++---
 .../processors/compute/ComputeGridMonitorTest.java |  13 +-
 .../compute/ComputeJobChangePriorityTest.java      | 259 +++++++++++++++++++++
 .../ComputeTaskWithWithoutFullSupportTest.java     | 142 +++++++++++
 .../internal/processors/compute/NoopJob.java}      |  27 +--
 .../collision/GridTestCollisionTaskSession.java    | 140 +++++------
 .../testsuites/IgniteComputeGridTestSuite.java     |   6 +-
 14 files changed, 707 insertions(+), 206 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
index 1088032..25742b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJobSibling;
 import org.apache.ignite.compute.ComputeTaskSessionAttributeListener;
+import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
 import org.apache.ignite.compute.ComputeTaskSessionScope;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -105,7 +106,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
     /** */
     private final AtomicInteger usage = new AtomicInteger(1);
 
-    /** */
+    /** Task supports session attributes and checkpoints (there is {@link ComputeTaskSessionFullSupport}). */
     private final boolean fullSup;
 
     /** */
@@ -211,13 +212,17 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
     }
 
     /**
+     * Checks that the task supports session attributes and checkpoints
+     * (there is {@link ComputeTaskSessionFullSupport}).
      *
+     * @throws IllegalStateException If not supported.
      */
     protected void checkFullSupport() {
-        if (!fullSup)
+        if (!fullSup) {
             throw new IllegalStateException("Sessions attributes and checkpoints are disabled by default " +
                 "for better performance (to enable, annotate task class with " +
-                "@ComputeTaskSessionFullSupport annotation).");
+                "@" + ComputeTaskSessionFullSupport.class.getSimpleName() + " annotation).");
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java
index 2282e60..0e41221 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java
@@ -104,27 +104,33 @@ public class GridCollisionManager extends GridManagerAdapter<CollisionSpi> {
     }
 
     /**
-     * @param waitJobs List of waiting jobs.
-     * @param activeJobs List of active jobs.
-     * @param heldJobs List of held jobs.
+     * Invoke collision SPI.
+     *
+     * @param waitJobs Collection of waiting jobs.
+     * @param activeJobs Collection of active jobs.
+     * @param heldJobs Collection of held jobs.
      */
     public void onCollision(
         final Collection<CollisionJobContext> waitJobs,
         final Collection<CollisionJobContext> activeJobs,
-        final Collection<CollisionJobContext> heldJobs) {
+        final Collection<CollisionJobContext> heldJobs
+    ) {
         if (enabled()) {
             if (log.isDebugEnabled())
                 log.debug("Resolving job collisions [waitJobs=" + waitJobs + ", activeJobs=" + activeJobs + ']');
 
             getSpi().onCollision(new CollisionContext() {
+                /** {@inheritDoc} */
                 @Override public Collection<CollisionJobContext> activeJobs() {
                     return activeJobs;
                 }
 
+                /** {@inheritDoc} */
                 @Override public Collection<CollisionJobContext> waitingJobs() {
                     return waitJobs;
                 }
 
+                /** {@inheritDoc} */
                 @Override public Collection<CollisionJobContext> heldJobs() {
                     return heldJobs;
                 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java
index 3eed487..32af38b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java
@@ -24,17 +24,22 @@ import java.util.EventListener;
  */
 interface GridJobEventListener extends EventListener {
     /**
+     * @param worker Job worker.
+     */
+    void onJobQueued(GridJobWorker worker);
+
+    /**
      * @param worker Started job worker.
      */
-    public void onJobStarted(GridJobWorker worker);
+    void onJobStarted(GridJobWorker worker);
 
     /**
      * @param worker Job worker.
      */
-    public void onBeforeJobResponseSent(GridJobWorker worker);
+    void onBeforeJobResponseSent(GridJobWorker worker);
 
     /**
      * @param worker Finished job worker.
      */
-    public void onJobFinished(GridJobWorker worker);
+    void onJobFinished(GridJobWorker worker);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 5abe0dc..122b2ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.GridTaskSessionRequest;
 import org.apache.ignite.internal.SkipDaemon;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.collision.GridCollisionJobContextAdapter;
+import org.apache.ignite.internal.managers.collision.GridCollisionManager;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -90,6 +91,8 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.spi.collision.CollisionSpi;
+import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
 import org.apache.ignite.spi.metric.DoubleMetric;
 import org.apache.ignite.spi.systemview.view.ComputeJobView;
 import org.apache.ignite.spi.systemview.view.ComputeJobView.ComputeJobState;
@@ -166,7 +169,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
     /** */
     private final Marshaller marsh;
 
-    /** */
+    /** Collision SPI is not available: {@link GridCollisionManager#enabled()} {@code == false}. */
     private final boolean jobAlwaysActivate;
 
     /** */
@@ -298,6 +301,18 @@ public class GridJobProcessor extends GridProcessorAdapter {
     private final ThreadLocal<GridJobSessionImpl> currSess = new ThreadLocal<>();
 
     /**
+     * {@link PriorityQueueCollisionSpi#getPriorityAttributeKey} or
+     * {@link null} if the {@link PriorityQueueCollisionSpi} is not configured.
+     */
+    @Nullable private final String taskPriAttrKey;
+
+    /**
+     * {@link PriorityQueueCollisionSpi#getJobPriorityAttributeKey} or
+     * {@link null} if the {@link PriorityQueueCollisionSpi} is not configured.
+     */
+    @Nullable private final String jobPriAttrKey;
+
+    /**
      * @param ctx Kernal context.
      */
     public GridJobProcessor(GridKernalContext ctx) {
@@ -352,6 +367,17 @@ public class GridJobProcessor extends GridProcessorAdapter {
 
                 return new ComputeJobView(e.getKey(), e.getValue(), state);
             });
+
+        CollisionSpi collisionSpi = ctx.config().getCollisionSpi();
+
+        if (!jobAlwaysActivate && collisionSpi instanceof PriorityQueueCollisionSpi) {
+            taskPriAttrKey = ((PriorityQueueCollisionSpi)collisionSpi).getPriorityAttributeKey();
+            jobPriAttrKey = ((PriorityQueueCollisionSpi)collisionSpi).getJobPriorityAttributeKey();
+        }
+        else {
+            taskPriAttrKey = null;
+            jobPriAttrKey = null;
+        }
     }
 
     /** {@inheritDoc} */
@@ -872,24 +898,29 @@ public class GridJobProcessor extends GridProcessorAdapter {
             ctx.collision().onCollision(
                 // Passive jobs view.
                 new AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>() {
+                    /** {@inheritDoc} */
                     @NotNull @Override public Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
                         final Iterator<GridJobWorker> iter = passiveJobs.values().iterator();
 
                         return new Iterator<org.apache.ignite.spi.collision.CollisionJobContext>() {
+                            /** {@inheritDoc} */
                             @Override public boolean hasNext() {
                                 return iter.hasNext();
                             }
 
+                            /** {@inheritDoc} */
                             @Override public org.apache.ignite.spi.collision.CollisionJobContext next() {
                                 return new CollisionJobContext(iter.next(), true);
                             }
 
+                            /** {@inheritDoc} */
                             @Override public void remove() {
                                 throw new UnsupportedOperationException();
                             }
                         };
                     }
 
+                    /** {@inheritDoc} */
                     @Override public int size() {
                         return passiveJobs.size();
                     }
@@ -897,6 +928,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
 
                 // Active jobs view.
                 new AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>() {
+                    /** {@inheritDoc} */
                     @NotNull @Override public Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
                         final Iterator<GridJobWorker> iter = activeJobs.values().iterator();
 
@@ -926,10 +958,12 @@ public class GridJobProcessor extends GridProcessorAdapter {
                                 }
                             }
 
+                            /** {@inheritDoc} */
                             @Override public boolean hasNext() {
                                 return w != null;
                             }
 
+                            /** {@inheritDoc} */
                             @Override public org.apache.ignite.spi.collision.CollisionJobContext next() {
                                 if (w == null)
                                     throw new NoSuchElementException();
@@ -943,21 +977,24 @@ public class GridJobProcessor extends GridProcessorAdapter {
                                 return ret;
                             }
 
+                            /** {@inheritDoc} */
                             @Override public void remove() {
                                 throw new UnsupportedOperationException();
                             }
                         };
                     }
 
+                    /** {@inheritDoc} */
                     @Override public int size() {
                         int ret = activeJobs.size() - heldJobs.size();
 
-                        return ret > 0 ? ret : 0;
+                        return Math.max(ret, 0);
                     }
                 },
 
                 // Held jobs view.
                 new AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>() {
+                    /** {@inheritDoc} */
                     @NotNull @Override public Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
                         final Iterator<GridJobWorker> iter = activeJobs.values().iterator();
 
@@ -987,10 +1024,12 @@ public class GridJobProcessor extends GridProcessorAdapter {
                                 }
                             }
 
+                            /** {@inheritDoc} */
                             @Override public boolean hasNext() {
                                 return w != null;
                             }
 
+                            /** {@inheritDoc} */
                             @Override public org.apache.ignite.spi.collision.CollisionJobContext next() {
                                 if (w == null)
                                     throw new NoSuchElementException();
@@ -1005,12 +1044,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
                                 return ret;
                             }
 
+                            /** {@inheritDoc} */
                             @Override public void remove() {
                                 throw new UnsupportedOperationException();
                             }
                         };
                     }
 
+                    /** {@inheritDoc} */
                     @Override public int size() {
                         return heldJobs.size();
                     }
@@ -1688,6 +1729,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
             synchronized (ses) {
                 ses.setInternal(attrs);
             }
+
+            onChangeTaskAttributes(req.getSessionId(), req.getJobId(), attrs);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to deserialize session attributes.", e);
@@ -1944,6 +1987,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
         private final GridMessageListener sesLsnr = new JobSessionListener();
 
         /** {@inheritDoc} */
+        @Override public void onJobQueued(GridJobWorker worker) {
+            if (worker.getSession().isFullSupport()) {
+                // Register session request listener for this job.
+                ctx.io().addMessageListener(worker.getJobTopic(), sesLsnr);
+            }
+        }
+
+        /** {@inheritDoc} */
         @Override public void onJobStarted(GridJobWorker worker) {
             if (log.isDebugEnabled())
                 log.debug("Received onJobStarted() callback: " + worker);
@@ -1954,10 +2005,6 @@ public class GridJobProcessor extends GridProcessorAdapter {
             // Register for timeout notifications.
             if (worker.endTime() < Long.MAX_VALUE)
                 ctx.timeout().addTimeoutObject(worker);
-
-            if (worker.getSession().isFullSupport())
-                // Register session request listener for this job.
-                ctx.io().addMessageListener(worker.getJobTopic(), sesLsnr);
         }
 
         /** {@inheritDoc} */
@@ -2294,6 +2341,51 @@ public class GridJobProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Callback on changing task attributes.
+     *
+     * @param sesId Session ID.
+     * @param jobId Job ID.
+     * @param attrs Changed attributes.
+     */
+    public void onChangeTaskAttributes(IgniteUuid sesId, IgniteUuid jobId, Map<?, ?> attrs) {
+        if (!rwLock.tryReadLock()) {
+            if (log.isDebugEnabled())
+                log.debug("Callback on changing the task attributes will be ignored " +
+                    "(node is in the process of stopping): " + sesId);
+
+            return;
+        }
+
+        try {
+            if (jobAlwaysActivate || (taskPriAttrKey == null && jobPriAttrKey == null))
+                return;
+
+            GridJobWorker jobWorker = passiveJobs.get(jobId);
+
+            if (jobWorker == null || jobWorker.isInternal())
+                return;
+
+            boolean handleCollisions = false;
+
+            if (taskPriAttrKey != null && attrs.containsKey(taskPriAttrKey)) {
+                // See PriorityQueueCollisionSpi#bumpPriority.
+                jobWorker.getSession().setAttribute(jobPriAttrKey, attrs.get(taskPriAttrKey));
+
+                handleCollisions = true;
+            }
+
+            if (!handleCollisions && jobPriAttrKey != null && attrs.containsKey(jobPriAttrKey))
+                handleCollisions = true;
+
+            if (handleCollisions)
+                handleCollisions();
+        }
+        finally {
+            rwLock.readUnlock();
+        }
+    }
+
+    /**
      * @param sesId Task session ID.
      * @return Job statistics for the task. Mapping: Job status -> count of jobs.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index cb2a5f4..09ef5ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -490,6 +490,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
             // Inject resources.
             ctx.resource().inject(dep, taskCls, job, ses, jobCtx);
 
+            // Event notification.
+            evtLsnr.onJobQueued(this);
+
             if (!internal && ctx.event().isRecordable(EVT_JOB_QUEUED))
                 recordEvent(EVT_JOB_QUEUED, "Job got queued for computation.");
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 9624af8..0258114 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1057,7 +1057,7 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
 
                 UUID nodeId = sib.nodeId();
 
-                if (!nodeId.equals(locNodeId) && !sib.isJobDone() && !rcvrs.contains(nodeId))
+                if (!nodeId.equals(locNodeId) && !sib.isJobDone())
                     rcvrs.add(nodeId);
             }
         }
@@ -1086,8 +1086,12 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
 
             UUID nodeId = sib.nodeId();
 
-            // Pair can be null if job is finished.
-            if (rcvrs.remove(nodeId)) {
+            if (locNodeId.equals(nodeId)) {
+                // Local job notification.
+                ctx.job().onChangeTaskAttributes(ses.getId(), s.getJobId(), attrs);
+            }
+            else if (rcvrs.remove(nodeId)) {
+                // Pair can be null if job is finished.
                 ClusterNode node = ctx.discovery().node(nodeId);
 
                 // Check that node didn't change (it could happen in case of failover).
@@ -1096,9 +1100,10 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
 
                     GridTaskSessionRequest req = new GridTaskSessionRequest(
                         ses.getId(),
-                        null,
+                        s.getJobId(),
                         loc ? null : U.marshal(marsh, attrs),
-                        attrs);
+                        attrs
+                    );
 
                     // Make sure to go through IO manager always, since order
                     // should be preserved here.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java
index d4247e4..a40e3c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.spi.collision.priorityqueue;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.GridTaskSessionInternal;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -239,7 +238,8 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
     private volatile boolean preventStarvation = DFLT_PREVENT_STARVATION_ENABLED;
 
     /** Cached priority comparator instance. */
-    private Comparator<GridCollisionJobContextWrapper> priComp;
+    private final Comparator<GridCollisionJobContextWrapper> priComp =
+        Comparator.comparing(w -> getJobPriority(w.ctx), Comparator.reverseOrder());
 
     /** */
     @LoggerResource
@@ -530,7 +530,7 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
                 }
             }
             else {
-                Collections.sort(waitSnap, priorityComparator());
+                waitSnap.sort(priComp);
                 waitSnapSorted = true;
 
                 if (preventStarvation)
@@ -551,7 +551,7 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
             int skip = waitSnap.size() - waitSize;
 
             if (!waitSnapSorted)
-                Collections.sort(waitSnap, priorityComparator());
+                waitSnap.sort(priComp);
 
             int i = 0;
 
@@ -604,7 +604,8 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
 
     /**
      * Gets job priority. At first tries to get from job context. If job context has no priority,
-     * then tries to get from task session. If task session has no priority default one will be used.
+     * then tries to get from task session. If task session does not support attributes or there
+     * is no priority in them, then the default priority is used.
      *
      * @param ctx Collision job context.
      * @return Job priority.
@@ -612,43 +613,49 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
     private int getJobPriority(CollisionJobContext ctx) {
         assert ctx != null;
 
-        Integer p = null;
+        Integer pri = null;
 
-        ComputeJobContext jctx = ctx.getJobContext();
+        ComputeJobContext jobCtx = ctx.getJobContext();
 
         try {
-            p = (Integer)jctx.getAttribute(jobPriAttrKey);
+            pri = jobCtx.getAttribute(jobPriAttrKey);
         }
         catch (ClassCastException e) {
             LT.error(log, e, "Type of job context priority attribute '" + jobPriAttrKey +
-                "' is not java.lang.Integer [type=" + jctx.getAttribute(jobPriAttrKey).getClass() + ']');
+                "' is not java.lang.Integer [type=" + jobCtx.getAttribute(jobPriAttrKey).getClass() + ']');
         }
 
-        if (p == null) {
-            ComputeTaskSession ses = ctx.getTaskSession();
+        if (pri == null) {
+            GridTaskSessionInternal taskSes = (GridTaskSessionInternal)ctx.getTaskSession();
 
-            try {
-                p = (Integer)ses.getAttribute(taskPriAttrKey);
-            }
-            catch (ClassCastException e) {
-                LT.error(log, e, "Type of task session priority attribute '" + taskPriAttrKey +
-                    "' is not java.lang.Integer [type=" + ses.getAttribute(taskPriAttrKey).getClass() + ']');
-            }
+            if (!taskSes.isFullSupport()) {
+                if (log.isDebugEnabled())
+                    log.debug("Task does not support session attributes (will use default priority): " + dfltPri);
 
-            if (p == null) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Failed get priority from job context attribute '" + jobPriAttrKey +
-                        "' and task session attribute '" + taskPriAttrKey + "' (will use default priority): " +
-                        dfltPri);
+                pri = dfltPri;
+            }
+            else {
+                try {
+                    pri = taskSes.getAttribute(taskPriAttrKey);
+                }
+                catch (ClassCastException e) {
+                    LT.error(log, e, "Type of task session priority attribute '" + taskPriAttrKey +
+                        "' is not java.lang.Integer [type=" + taskSes.getAttribute(taskPriAttrKey).getClass() + ']');
                 }
 
-                p = dfltPri;
+                if (pri == null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Failed get priority from job context attribute '" + jobPriAttrKey +
+                            "' and task session attribute '" + taskPriAttrKey + "' (will use default priority): " +
+                            dfltPri);
+                    }
+
+                    pri = dfltPri;
+                }
             }
         }
 
-        assert p != null;
-
-        return p;
+        return pri;
     }
 
     /** {@inheritDoc} */
@@ -656,19 +663,6 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
         return Collections.singletonList(createSpiAttributeName(PRIORITY_ATTRIBUTE_KEY));
     }
 
-    /**
-     * Returns (possibly shared) comparator fo sorting GridCollisionJobContextWrapper
-     * by priority.
-     *
-     * @return Comparator for priority sorting.
-     */
-    private Comparator<GridCollisionJobContextWrapper> priorityComparator() {
-        if (priComp == null)
-            priComp = new PriorityGridCollisionJobContextComparator();
-
-        return priComp;
-    }
-
     /** {@inheritDoc} */
     @Override public PriorityQueueCollisionSpi setName(String name) {
         super.setName(name);
@@ -682,22 +676,6 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
     }
 
     /**
-     * Comparator for by priority comparison of collision contexts.
-     */
-    private class PriorityGridCollisionJobContextComparator implements Comparator<GridCollisionJobContextWrapper>, Serializable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public int compare(GridCollisionJobContextWrapper o1, GridCollisionJobContextWrapper o2) {
-            int p1 = getJobPriority(o1.getContext());
-            int p2 = getJobPriority(o2.getContext());
-
-            return p1 < p2 ? 1 : p1 == p2 ? 0 : -1;
-        }
-    }
-
-    /**
      * Wrapper class to keep original collision context position.
      */
     private static class GridCollisionJobContextWrapper {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
index fcee6df..be45162 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
@@ -68,14 +68,41 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
         ignite = startGridsMultiThreaded(2);
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
-        ignite = null;
+        super.afterTest();
 
         stopAllGrids();
+
+        ignite = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi();
+
+        // One job at a time.
+        colSpi.setActiveJobsThreshold(1);
+        colSpi.setWaitJobsThreshold(0);
+
+        JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi();
+
+        // Verify defaults.
+        assert failSpi.getMaximumFailoverAttempts() == JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS;
+
+        cfg.setCollisionSpi(colSpi);
+        cfg.setFailoverSpi(failSpi);
+
+        return cfg;
     }
 
     /**
@@ -94,7 +121,9 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
         int threadsNum = 10;
 
         GridTestUtils.runMultiThreaded(new Runnable() {
-            /** */
+            /**
+             *
+             */
             @Override public void run() {
                 try {
                     JobStealingResult res = ignite.compute().execute(new JobStealingTask(2), null);
@@ -122,7 +151,7 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
         // Total jobs number is threadsNum * 2
         assertEquals("Incorrect processed jobs number", threadsNum * 2, stolen.get() + noneStolen.get());
 
-        assertFalse( "No jobs were stolen.", stolen.get() == 0);
+        assertFalse("No jobs were stolen.", stolen.get() == 0);
 
         for (Ignite g : G.allGrids())
             assertTrue("Node get no jobs.", nodes.contains(g.name()));
@@ -130,7 +159,7 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
         // Under these circumstances we should not have  more than 2 jobs
         // difference.
         //(but muted to 4 due to very rare fails and low priority of fix)
-        assertTrue( "Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']',
+        assertTrue("Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']',
             Math.abs(stolen.get() - noneStolen.get()) <= 4);
     }
 
@@ -154,7 +183,9 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
         jobExecutedLatch = new CountDownLatch(threadsNum);
 
         final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new Runnable() {
-            /** */
+            /**
+             *
+             */
             @Override public void run() {
                 try {
                     final IgniteCompute compute = ignite.compute().withAsync();
@@ -190,38 +221,16 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
 
         assertNull("Test failed with exception: ", fail.get());
 
-        // Total jobs number is threadsNum * 3
+        // Total jobs number is threadsNum * 4
         assertEquals("Incorrect processed jobs number", threadsNum * jobsPerTask, stolen.get() + noneStolen.get());
 
-        assertFalse( "No jobs were stolen.", stolen.get() == 0);
+        assertFalse("No jobs were stolen.", stolen.get() == 0);
 
         for (Ignite g : G.allGrids())
             assertTrue("Node get no jobs.", nodes.contains(g.name()));
 
-        assertTrue( "Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']',
-            Math.abs(stolen.get() - 2 * noneStolen.get()) <= 6);
-    }
-
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
-        JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi();
-
-        // One job at a time.
-        colSpi.setActiveJobsThreshold(1);
-        colSpi.setWaitJobsThreshold(0);
-
-        JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi();
-
-        // Verify defaults.
-        assert failSpi.getMaximumFailoverAttempts() == JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS;
-
-        cfg.setCollisionSpi(colSpi);
-        cfg.setFailoverSpi(failSpi);
-
-        return cfg;
+        assertTrue("Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']',
+            Math.abs(stolen.get() - 2 * noneStolen.get()) <= 8);
     }
 
     /**
@@ -300,7 +309,10 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
         /** {@inheritDoc} */
         @Override public Serializable execute() {
             try {
-                jobExecutedLatch.countDown();
+                CountDownLatch latch = jobExecutedLatch;
+
+                if (latch != null)
+                    latch.countDown();
 
                 Long sleep = argument(0);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
index ebb5be1..44a06a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeTaskAdapter;
 import org.apache.ignite.compute.ComputeTaskFuture;
@@ -320,7 +319,7 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest {
             List<ClusterNode> subgrid,
             Void arg
         ) throws IgniteException {
-            return subgrid.stream().collect(toMap(n -> new NoopComputeJob(), identity()));
+            return subgrid.stream().collect(toMap(n -> new NoopJob(), identity()));
         }
 
         /** {@inheritDoc} */
@@ -350,7 +349,7 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest {
         ) throws IgniteException {
             doneOnMapFut.onDone();
 
-            return subgrid.stream().collect(toMap(n -> new NoopComputeJob() {
+            return subgrid.stream().collect(toMap(n -> new NoopJob() {
                 /** {@inheritDoc} */
                 @Override public Object execute() throws IgniteException {
                     try {
@@ -371,14 +370,6 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest {
         }
     }
 
-    /** */
-    private static class NoopComputeJob extends ComputeJobAdapter {
-        /** {@inheritDoc} */
-        @Override public Object execute() throws IgniteException {
-            return null;
-        }
-    }
-
     /**
      * @param session Task session.
      * @param snapshot Task status snapshot.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java
new file mode 100644
index 0000000..8727164
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.CollisionJobContext;
+import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/**
+ * Class for testing job priority change.
+ */
+public class ComputeJobChangePriorityTest extends GridCommonAbstractTest {
+    /** Coordinator. */
+    private static IgniteEx CRD;
+
+    /** */
+    private static Method ON_CHANGE_TASK_ATTRS_MTD;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        stopAllGrids();
+
+        IgniteEx crd = startGrids(2);
+
+        crd.cluster().state(ACTIVE);
+
+        awaitPartitionMapExchange();
+
+        CRD = crd;
+
+        ON_CHANGE_TASK_ATTRS_MTD = GridJobProcessor.class.getDeclaredMethod(
+            "onChangeTaskAttributes",
+            IgniteUuid.class,
+            IgniteUuid.class,
+            Map.class
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+
+        CRD = null;
+        ON_CHANGE_TASK_ATTRS_MTD = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        for (Ignite n : G.allGrids())
+            PriorityQueueCollisionSpiEx.spiEx(n).reset();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setFailureHandler(new StopNodeFailureHandler())
+            .setCollisionSpi(new PriorityQueueCollisionSpiEx())
+            .setMetricsUpdateFrequency(Long.MAX_VALUE)
+            .setClientFailureDetectionTimeout(Long.MAX_VALUE);
+    }
+
+    /**
+     * Checking that when {@link PriorityQueueCollisionSpi#getPriorityAttributeKey} is changed,
+     * collisions will be handled.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testChangeTaskPriorityAttribute() throws Exception {
+        checkChangeAttributes(
+            PriorityQueueCollisionSpiEx.spiEx(CRD).getPriorityAttributeKey(),
+            1,
+            true
+        );
+    }
+
+    /**
+     * Checking that when {@link PriorityQueueCollisionSpi#getJobPriorityAttributeKey} is changed,
+     * collisions will be handled.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testChangeJobPriorityAttribute() throws Exception {
+        checkChangeAttributes(
+            PriorityQueueCollisionSpiEx.spiEx(CRD).getJobPriorityAttributeKey(),
+            1,
+            true
+        );
+    }
+
+    /**
+     * Checking that no collision handling will occur when a random attribute is changed.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testChangeRandomAttribute() throws Exception {
+        checkChangeAttributes(
+            UUID.randomUUID().toString(),
+            UUID.randomUUID().toString(),
+            false
+        );
+    }
+
+    /** */
+    private void checkChangeAttributes(
+        String key,
+        Object val,
+        boolean expHandleCollisionOnChangeTaskAttrs
+    ) throws Exception {
+        ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(new NoopTask(), null);
+
+        for (Ignite n : G.allGrids())
+            PriorityQueueCollisionSpiEx.spiEx(n).waitJobFut.get(getTestTimeout());
+
+        for (Ignite n : G.allGrids())
+            PriorityQueueCollisionSpiEx.spiEx(n).handleCollision = true;
+
+        taskFut.getTaskSession().setAttribute(key, val);
+
+        for (Ignite n : G.allGrids()) {
+            assertEquals(
+                val,
+                PriorityQueueCollisionSpiEx.spiEx(n).waitJobFut.result()
+                    .getTaskSession().waitForAttribute(key, getTestTimeout()));
+        }
+
+        for (Ignite n : G.allGrids()) {
+            GridFutureAdapter<Void> fut = PriorityQueueCollisionSpiEx.spiEx(n).onChangeTaskAttrsFut;
+
+            if (expHandleCollisionOnChangeTaskAttrs)
+                fut.get(getTestTimeout());
+            else
+                assertThrows(log, () -> fut.get(100), IgniteFutureTimeoutCheckedException.class, null);
+        }
+
+        if (!expHandleCollisionOnChangeTaskAttrs)
+            CRD.compute().execute(new NoopTask(), null);
+
+        taskFut.get(getTestTimeout());
+    }
+
+    /** */
+    private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
+        /** */
+        volatile boolean handleCollision;
+
+        /** */
+        final GridFutureAdapter<CollisionJobContext> waitJobFut = new GridFutureAdapter<>();
+
+        /** */
+        final GridFutureAdapter<Void> onChangeTaskAttrsFut = new GridFutureAdapter<>();
+
+        /** {@inheritDoc} */
+        @Override public void onCollision(CollisionContext ctx) {
+            if (!waitJobFut.isDone()) {
+                ctx.waitingJobs().stream()
+                    .filter(collisionJobCtx -> collisionJobCtx.getJob() instanceof NoopJob)
+                    .findAny()
+                    .ifPresent(waitJobFut::onDone);
+            }
+
+            if (handleCollision) {
+                if (!onChangeTaskAttrsFut.isDone()) {
+                    Stream.of(new Exception().getStackTrace())
+                        .filter(el ->
+                            ON_CHANGE_TASK_ATTRS_MTD.getDeclaringClass().getName().equals(el.getClassName()) &&
+                                ON_CHANGE_TASK_ATTRS_MTD.getName().equals(el.getMethodName())
+                        )
+                        .findAny()
+                        .ifPresent(el -> onChangeTaskAttrsFut.onDone());
+                }
+
+                super.onCollision(ctx);
+            }
+        }
+
+        /** */
+        void reset() {
+            handleCollision = false;
+
+            waitJobFut.reset();
+
+            onChangeTaskAttrsFut.reset();
+        }
+
+        /** */
+        static PriorityQueueCollisionSpiEx spiEx(Ignite n) {
+            return ((PriorityQueueCollisionSpiEx)((IgniteEx)n).context().config().getCollisionSpi());
+        }
+    }
+
+    /** */
+    @ComputeTaskSessionFullSupport
+    private static class NoopTask extends ComputeTaskAdapter<Void, Void> {
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            Void arg
+        ) throws IgniteException {
+            return subgrid.stream().collect(toMap(n -> new NoopJob(), identity()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+            return null;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeTaskWithWithoutFullSupportTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeTaskWithWithoutFullSupportTest.java
new file mode 100644
index 0000000..cc39cea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeTaskWithWithoutFullSupportTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.GridTaskSessionInternal;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+
+/**
+ * Class for checking that there will be no errors when starting tasks with/without
+ * {@link ComputeTaskSessionFullSupport}.
+ */
+public class ComputeTaskWithWithoutFullSupportTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCollisionSpi(new PriorityQueueCollisionSpiEx().setParallelJobsNumber(1))
+            .setFailureHandler(new StopNodeFailureHandler())
+            .setMetricsUpdateFrequency(Long.MAX_VALUE)
+            .setClientFailureDetectionTimeout(Long.MAX_VALUE);
+    }
+
+    /**
+     * Checking that if there is {@link PriorityQueueCollisionSpi},
+     * it is possible to run tasks with and without {@link ComputeTaskSessionFullSupport}.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void test() throws Exception {
+        IgniteEx n = startGrid(0);
+
+        n.cluster().state(ACTIVE);
+
+        ComputeTaskFuture<Void> taskFut0 = n.compute().executeAsync(new TaskWithFullSupport(), null);
+        assertTrue(((GridTaskSessionInternal)taskFut0.getTaskSession()).isFullSupport());
+
+        ((PriorityQueueCollisionSpiEx)n.configuration().getCollisionSpi()).handleCollision = true;
+
+        ComputeTaskFuture<Void> taskFut1 = n.compute().executeAsync(new TaskWithoutFullSupport(), null);
+        assertFalse(((GridTaskSessionInternal)taskFut1.getTaskSession()).isFullSupport());
+
+        taskFut0.get(TimeUnit.SECONDS.toMillis(1));
+        taskFut1.get(TimeUnit.SECONDS.toMillis(1));
+    }
+
+    /** */
+    private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
+        /** */
+        volatile boolean handleCollision;
+
+        /** {@inheritDoc} */
+        @Override public void onCollision(CollisionContext ctx) {
+            if (handleCollision)
+                super.onCollision(ctx);
+        }
+    }
+
+    /** */
+    @ComputeTaskSessionFullSupport
+    private static class TaskWithFullSupport extends ComputeTaskAdapter<Void, Void> {
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            Void arg
+        ) throws IgniteException {
+            assertFalse(subgrid.isEmpty());
+
+            return singletonMap(new NoopJob(), subgrid.get(0));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+            return null;
+        }
+    }
+
+    /** */
+    private static class TaskWithoutFullSupport extends ComputeTaskAdapter<Void, Void> {
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            Void arg
+        ) throws IgniteException {
+            assertFalse(subgrid.isEmpty());
+
+            return singletonMap(new NoopJob(), subgrid.get(0));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+            return null;
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/NoopJob.java
similarity index 62%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java
copy to modules/core/src/test/java/org/apache/ignite/internal/processors/compute/NoopJob.java
index 3eed487..48f8bc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/NoopJob.java
@@ -15,26 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.job;
+package org.apache.ignite.internal.processors.compute;
 
-import java.util.EventListener;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobAdapter;
 
 /**
- * Job event listener.
+ *
  */
-interface GridJobEventListener extends EventListener {
-    /**
-     * @param worker Started job worker.
-     */
-    public void onJobStarted(GridJobWorker worker);
-
-    /**
-     * @param worker Job worker.
-     */
-    public void onBeforeJobResponseSent(GridJobWorker worker);
-
-    /**
-     * @param worker Finished job worker.
-     */
-    public void onJobFinished(GridJobWorker worker);
+class NoopJob extends ComputeJobAdapter {
+    /** {@inheritDoc} */
+    @Override public Object execute() throws IgniteException {
+        return null;
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java b/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
index 3a7d757..47c00d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
@@ -21,28 +21,33 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.compute.ComputeJobSibling;
-import org.apache.ignite.compute.ComputeTaskSession;
 import org.apache.ignite.compute.ComputeTaskSessionAttributeListener;
 import org.apache.ignite.compute.ComputeTaskSessionScope;
+import org.apache.ignite.internal.GridTaskSessionInternal;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Test collision task session.
  */
-public class GridTestCollisionTaskSession implements ComputeTaskSession {
-    /** */
+public class GridTestCollisionTaskSession implements GridTaskSessionInternal {
+    /** Task priority. */
     private Integer pri = 0;
 
-    /** */
+    /** Task priority attribute key.  */
     private String priAttrKey;
 
-    /** */
+    /**
+     * Default constructor.
+     */
     public GridTestCollisionTaskSession() {
         // No-op.
     }
 
     /**
+     * Constructor.
+     *
      * @param pri Priority.
      * @param priAttrKey Priority attribute key.
      */
@@ -55,81 +60,68 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
 
     /** {@inheritDoc} */
     @Override public UUID getTaskNodeId() {
-        assert false;
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public <K, V> V waitForAttribute(K key, long timeout) {
-        assert false : "Not implemented";
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public boolean waitForAttribute(Object key, Object val, long timeout) throws InterruptedException {
-        assert false : "Not implemented";
-
-        return false;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public Map<?, ?> waitForAttributes(Collection<?> keys, long timeout) {
-        assert false : "Not implemented";
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
-    @Override public boolean waitForAttributes(Map<?, ?> attrs, long timeout) throws InterruptedException {
-        assert false : "Not implemented";
-
-        return false;
+    @Override public boolean waitForAttributes(Map<?, ?> attrs, long timeout) {
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public void saveCheckpoint(String key, Object state) {
-        assert false : "Not implemented";
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public void saveCheckpoint(String key, Object state, ComputeTaskSessionScope scope, long timeout) {
-        assert false : "Not implemented";
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
-    @Override public void saveCheckpoint(String key, Object state, ComputeTaskSessionScope scope, long timeout,
-        boolean overwrite) {
-        assert false : "Not implemented";
+    @Override public void saveCheckpoint(
+        String key,
+        Object state,
+        ComputeTaskSessionScope scope,
+        long timeout,
+        boolean overwrite
+    ) {
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public <T> T loadCheckpoint(String key) {
-        assert false : "Not implemented";
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public boolean removeCheckpoint(String key) {
-        assert false : "Not implemented";
-
-        return false;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public String getTaskName() {
-        assert false : "Not implemented";
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public IgniteUuid getId() {
-        assert false : "Not implemented";
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
@@ -139,16 +131,12 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
 
     /** {@inheritDoc} */
     @Override public ClassLoader getClassLoader() {
-        assert false : "Not implemented";
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public Collection<ComputeJobSibling> getJobSiblings() {
-        assert false : "Not implemented";
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
@@ -158,14 +146,12 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
 
     /** {@inheritDoc} */
     @Override public ComputeJobSibling getJobSibling(IgniteUuid jobId) {
-        assert false : "Not implemented";
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public void setAttribute(Object key, Object val) {
-        assert false : "Not implemented";
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
@@ -178,33 +164,27 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
 
     /** {@inheritDoc} */
     @Override public void setAttributes(Map<?, ?> attrs) {
-        assert false : "Not implemented";
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public Map<Object, Object> getAttributes() {
-        assert false : "Not implemented";
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public void addAttributeListener(ComputeTaskSessionAttributeListener lsnr, boolean rewind) {
-        assert false : "Not implemented";
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public boolean removeAttributeListener(ComputeTaskSessionAttributeListener lsnr) {
-        assert false : "Not implemented";
-
-        return false;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> mapFuture() {
-        assert false : "Not implemented";
-
-        return null;
+        throw new UnsupportedOperationException("Not implemented");
     }
 
     /**
@@ -239,14 +219,42 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        StringBuilder buf = new StringBuilder();
+    @Override public String getCheckpointSpi() {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IgniteUuid getJobId() {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isTaskNode() {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClosed() {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClosed() {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridTaskSessionInternal session() {
+        throw new UnsupportedOperationException("Not implemented");
+    }
 
-        buf.append(getClass().getName());
-        buf.append(" [priority=").append(pri);
-        buf.append(", priorityAttrKey='").append(priAttrKey).append('\'');
-        buf.append(']');
+    /** {@inheritDoc} */
+    @Override public boolean isFullSupport() {
+        return true;
+    }
 
-        return buf.toString();
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return getClass().getName() + " [priority=" + pri + ", priorityAttrKey='" + priAttrKey + '\'' + ']';
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 61c1899..8ad91a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -80,7 +80,9 @@ import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfT
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
 import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
 import org.apache.ignite.internal.processors.compute.ComputeGridMonitorTest;
+import org.apache.ignite.internal.processors.compute.ComputeJobChangePriorityTest;
 import org.apache.ignite.internal.processors.compute.ComputeJobStatusTest;
+import org.apache.ignite.internal.processors.compute.ComputeTaskWithWithoutFullSupportTest;
 import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorConfigurationSelfTest;
 import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorSelfTest;
 import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
@@ -177,7 +179,9 @@ import org.junit.runners.Suite;
     VisorManagementEventSelfTest.class,
 
     ComputeGridMonitorTest.class,
-    ComputeJobStatusTest.class
+    ComputeJobChangePriorityTest.class,
+    ComputeJobStatusTest.class,
+    ComputeTaskWithWithoutFullSupportTest.class
 })
 public class IgniteComputeGridTestSuite {
 }