You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/01/19 13:56:42 UTC

[ignite] branch master updated: IGNITE-16325 Add compute task monitoring to keep track of their current status (#9747)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new abec656  IGNITE-16325 Add compute task monitoring to keep track of their current status (#9747)
abec656 is described below

commit abec656e1bfa12cac7ec9376081003ecb3b05e4d
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Jan 19 16:56:10 2022 +0300

    IGNITE-16325 Add compute task monitoring to keep track of their current status (#9747)
---
 .../apache/ignite/internal/GridJobResultImpl.java  |  16 +-
 .../ignite/internal/GridTaskSessionImpl.java       |  70 +++-
 .../processors/job/ComputeJobStatusEnum.java       |  41 ++
 .../internal/processors/job/GridJobProcessor.java  |  57 ++-
 .../internal/processors/job/GridJobWorker.java     |  86 +++-
 .../session/GridTaskSessionProcessor.java          |  16 +-
 .../processors/task/GridTaskEventListener.java     |  19 +-
 .../processors/task/GridTaskProcessor.java         | 139 ++++++-
 .../internal/processors/task/GridTaskWorker.java   |  84 +++-
 .../task/monitor/ComputeGridMonitor.java           |  39 ++
 .../processors/task/monitor/ComputeTaskStatus.java | 227 ++++++++++
 .../task/monitor/ComputeTaskStatusEnum.java        |  32 ++
 .../task/monitor/ComputeTaskStatusSnapshot.java    |  89 ++++
 .../processors/compute/ComputeGridMonitorTest.java | 390 ++++++++++++++++++
 .../processors/compute/ComputeJobStatusTest.java   | 457 +++++++++++++++++++++
 .../junits/common/GridCommonAbstractTest.java      |  16 +
 .../testsuites/IgniteComputeGridTestSuite.java     |   7 +-
 17 files changed, 1711 insertions(+), 74 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobResultImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobResultImpl.java
index 8fbce3f..860b44d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobResultImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobResultImpl.java
@@ -43,19 +43,19 @@ public class GridJobResultImpl implements ComputeJobResult {
     /** */
     private ClusterNode node;
 
-    /** */
+    /** Guarded by {@code this}. */
     private Object data;
 
-    /** */
+    /** Guarded by {@code this}. */
     private IgniteException ex;
 
-    /** */
+    /** Guarded by {@code this}. */
     private boolean hasRes;
 
-    /** */
+    /** Guarded by {@code this}. */
     private boolean isCancelled;
 
-    /** */
+    /** Guarded by {@code this}. */
     private boolean isOccupied;
 
     /**
@@ -128,10 +128,12 @@ public class GridJobResultImpl implements ComputeJobResult {
      * @param jobAttrs Job attributes.
      * @param isCancelled Whether job was cancelled or not.
      */
-    public synchronized void onResponse(@Nullable Object data,
+    public synchronized void onResponse(
+        @Nullable Object data,
         @Nullable IgniteException ex,
         @Nullable Map<Object, Object> jobAttrs,
-        boolean isCancelled) {
+        boolean isCancelled
+    ) {
         this.data = data;
         this.ex = ex;
         this.isCancelled = isCancelled;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
index 45a0e94..1088032 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
@@ -43,6 +43,10 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.unmodifiableCollection;
+
 /**
  * Task session.
  */
@@ -74,7 +78,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
     /** */
     private Collection<ComputeJobSibling> siblings;
 
-    /** */
+    /** Guarded by {@link #mux}. */
     private Map<Object, Object> attrs;
 
     /** */
@@ -120,6 +124,17 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
     private final String execName;
 
     /**
+     * Nodes on which the jobs of the task will be executed.
+     * Guarded by {@link #mux}.
+     */
+    @Nullable private List<UUID> jobNodes;
+
+    /** User who created the session, {@code null} if security is not enabled. */
+    @Nullable private final Object login;
+
+    /**
+     * Constructor.
+     *
      * @param taskNodeId Task node ID.
      * @param taskName Task name.
      * @param dep Deployment.
@@ -135,6 +150,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
      * @param fullSup Session full support enabled flag.
      * @param internal Internal task flag.
      * @param execName Custom executor name.
+     * @param login User who created the session, {@code null} if security is not enabled.
      */
     public GridTaskSessionImpl(
         UUID taskNodeId,
@@ -151,7 +167,9 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         GridKernalContext ctx,
         boolean fullSup,
         boolean internal,
-        @Nullable String execName) {
+        @Nullable String execName,
+        @Nullable Object login
+    ) {
         assert taskNodeId != null;
         assert taskName != null;
         assert sesId != null;
@@ -169,7 +187,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         this.sesId = sesId;
         this.startTime = startTime;
         this.endTime = endTime;
-        this.siblings = siblings != null ? Collections.unmodifiableCollection(siblings) : null;
+        this.siblings = siblings != null ? unmodifiableCollection(siblings) : null;
         this.ctx = ctx;
 
         if (attrs != null && !attrs.isEmpty()) {
@@ -183,6 +201,8 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         this.execName = execName;
 
         mapFut = new IgniteFutureImpl(new GridFutureAdapter());
+
+        this.login = login;
     }
 
     /** {@inheritDoc} */
@@ -359,7 +379,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         checkFullSupport();
 
         if (keys.isEmpty())
-            return Collections.emptyMap();
+            return emptyMap();
 
         if (timeout == 0)
             timeout = Long.MAX_VALUE;
@@ -518,7 +538,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
      */
     public void setJobSiblings(Collection<ComputeJobSibling> siblings) {
         synchronized (mux) {
-            this.siblings = Collections.unmodifiableCollection(siblings);
+            this.siblings = unmodifiableCollection(siblings);
         }
     }
 
@@ -534,7 +554,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
             tmp.addAll(this.siblings);
             tmp.addAll(siblings);
 
-            this.siblings = Collections.unmodifiableCollection(tmp);
+            this.siblings = unmodifiableCollection(tmp);
         }
     }
 
@@ -606,7 +626,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         checkFullSupport();
 
         synchronized (mux) {
-            return attrs == null || attrs.isEmpty() ? Collections.emptyMap() : U.sealMap(attrs);
+            return attrs == null || attrs.isEmpty() ? emptyMap() : U.sealMap(attrs);
         }
     }
 
@@ -914,6 +934,42 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         return execName;
     }
 
+    /**
+     * Sets nodes on which the jobs of the task will be executed.
+     *
+     * @param jobNodes Nodes on which the jobs of the task will be executed.
+     */
+    public void jobNodes(Collection<UUID> jobNodes) {
+        synchronized (mux) {
+            this.jobNodes = F.isEmpty(jobNodes) ? emptyList() : new ArrayList<>(jobNodes);
+        }
+    }
+
+    /**
+     * @return Nodes on which the jobs of the task will be executed.
+     */
+    public List<UUID> jobNodesSafeCopy() {
+        synchronized (mux) {
+            return F.isEmpty(jobNodes) ? emptyList() : new ArrayList<>(jobNodes);
+        }
+    }
+
+    /**
+     * @return All session attributes, without checks.
+     */
+    public Map<Object, Object> attributesSafeCopy() {
+        synchronized (mux) {
+            return F.isEmpty(attrs) ? emptyMap() : new HashMap<>(attrs);
+        }
+    }
+
+    /**
+     * @return User who created the session, {@code null} if security is not enabled.
+     */
+    public Object login() {
+        return login;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridTaskSessionImpl.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/ComputeJobStatusEnum.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/ComputeJobStatusEnum.java
new file mode 100644
index 0000000..a700535
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/ComputeJobStatusEnum.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.job;
+
+/**
+ * Compute job status.
+ */
+public enum ComputeJobStatusEnum {
+    /** */
+    QUEUED,
+
+    /** */
+    RUNNING,
+
+    /** */
+    SUSPENDED,
+
+    /** */
+    FAILED,
+
+    /** */
+    CANCELLED,
+
+    /** */
+    FINISHED;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index f47db23..5abe0dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -36,6 +36,8 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDeploymentException;
 import org.apache.ignite.IgniteException;
@@ -80,7 +82,6 @@ import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -97,6 +98,8 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedHashMap;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.counting;
+import static java.util.stream.Collectors.groupingBy;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_JOBS_HISTORY_SIZE;
 import static org.apache.ignite.events.EventType.EVT_JOB_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -737,26 +740,18 @@ public class GridJobProcessor extends GridProcessorAdapter {
             // Put either job ID or session ID (they are unique).
             cancelReqs.putIfAbsent(jobId != null ? jobId : sesId, sys);
 
-            IgnitePredicate<GridJobWorker> idsMatch = new P1<GridJobWorker>() {
-                @Override public boolean apply(GridJobWorker e) {
-                    return sesId != null ?
-                        jobId != null ?
-                            e.getSession().getId().equals(sesId) && e.getJobId().equals(jobId) :
-                            e.getSession().getId().equals(sesId) :
-                        e.getJobId().equals(jobId);
-                }
-            };
+            Predicate<GridJobWorker> idsMatch = idMatch(sesId, jobId);
 
             // If we don't have jobId then we have to iterate
             if (jobId == null) {
                 if (!jobAlwaysActivate) {
                     for (GridJobWorker job : passiveJobs.values()) {
-                        if (idsMatch.apply(job))
+                        if (idsMatch.test(job))
                             cancelPassiveJob(job);
                     }
                 }
                 for (GridJobWorker job : activeJobs.values()) {
-                    if (idsMatch.apply(job))
+                    if (idsMatch.test(job))
                         cancelActiveJob(job, sys);
                 }
             }
@@ -764,13 +759,13 @@ public class GridJobProcessor extends GridProcessorAdapter {
                 if (!jobAlwaysActivate) {
                     GridJobWorker passiveJob = passiveJobs.get(jobId);
 
-                    if (passiveJob != null && idsMatch.apply(passiveJob) && cancelPassiveJob(passiveJob))
+                    if (passiveJob != null && idsMatch.test(passiveJob) && cancelPassiveJob(passiveJob))
                         return;
                 }
 
                 GridJobWorker activeJob = activeJobs.get(jobId);
 
-                if (activeJob != null && idsMatch.apply(activeJob))
+                if (activeJob != null && idsMatch.test(activeJob))
                     cancelActiveJob(activeJob, sys);
             }
         }
@@ -1248,7 +1243,9 @@ public class GridJobProcessor extends GridProcessorAdapter {
                             sesAttrs,
                             req.isSessionFullSupport(),
                             req.isInternal(),
-                            req.executorName());
+                            req.executorName(),
+                            ctx.security().enabled() ? ctx.security().securityContext().subject().login() : null
+                        );
 
                         taskSes.setCheckpointSpi(req.getCheckpointSpi());
                         taskSes.setClassLoader(dep.classLoader());
@@ -2295,4 +2292,34 @@ public class GridJobProcessor extends GridProcessorAdapter {
             return sizex();
         }
     }
+
+    /**
+     * @param sesId Task session ID.
+     * @return Job statistics for the task. Mapping: Job status -> count of jobs.
+     */
+    public Map<ComputeJobStatusEnum, Long> jobStatuses(IgniteUuid sesId) {
+        return Stream.concat(
+                activeJobs.values().stream(),
+                jobAlwaysActivate ? cancelledJobs.values().stream() :
+                    Stream.concat(passiveJobs.values().stream(), cancelledJobs.values().stream())
+            )
+            .filter(idMatch(sesId, null))
+            .collect(groupingBy(GridJobWorker::status, counting()));
+    }
+
+    /**
+     * @param sesId Task session ID.
+     * @param jobId Job ID.
+     * @return ID workers predicate.
+     */
+    private Predicate<GridJobWorker> idMatch(@Nullable IgniteUuid sesId, @Nullable IgniteUuid jobId) {
+        assert sesId != null || jobId != null;
+
+        if (sesId == null)
+            return w -> jobId.equals(w.getJobId());
+        else if (jobId == null)
+            return w -> sesId.equals(w.getSession().getId());
+        else
+            return w -> sesId.equals(w.getSession().getId()) && jobId.equals(w.getJobId());
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index fa8d670..cb2a5f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -80,6 +80,12 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
 import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FAILED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FINISHED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.QUEUED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.RUNNING;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.SUSPENDED;
 
 /**
  * Job worker.
@@ -179,6 +185,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
     /** Security context. */
     private final SecurityContext secCtx;
 
+    /** Job status. */
+    private volatile ComputeJobStatusEnum status = QUEUED;
+
     /**
      * @param ctx Kernal context.
      * @param dep Grid deployment.
@@ -442,9 +451,12 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
 
         boolean res;
 
-        if (res = holdLsnr.onHeld(this))
+        if (res = holdLsnr.onHeld(this)) {
             held.incrementAndGet();
 
+            status = SUSPENDED;
+        }
+
         return res;
     }
 
@@ -513,6 +525,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
 
         isStarted = true;
 
+        status = RUNNING;
+
         // Event notification.
         evtLsnr.onJobStarted(this);
 
@@ -565,8 +579,10 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                 super.cancel();
 
             if (!skipNtf) {
-                if (holdLsnr.onUnheld(this))
-                    held.decrementAndGet();
+                if (holdLsnr.onUnheld(this)) {
+                    if (held.decrementAndGet() == 0)
+                        status = RUNNING;
+                }
                 else {
                     if (log.isDebugEnabled())
                         log.debug("Ignoring job execution (job was not held).");
@@ -753,11 +769,11 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                 if (log.isDebugEnabled())
                     log.debug("Cancelling job: " + ses);
 
-                U.wrapThreadLoader(dep.classLoader(), new IgniteRunnable() {
-                    @Override public void run() {
-                        try (OperationSecurityContext c = ctx.security().withContext(secCtx)) {
-                            job0.cancel();
-                        }
+                status = CANCELLED;
+
+                U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> {
+                    try (OperationSecurityContext c = ctx.security().withContext(secCtx)) {
+                        job0.cancel();
                     }
                 });
             }
@@ -814,9 +830,11 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
      * @param ex Error.
      * @param sndReply If {@code true}, reply will be sent.
      */
-    void finishJob(@Nullable Object res,
+    void finishJob(
+        @Nullable Object res,
         @Nullable IgniteException ex,
-        boolean sndReply) {
+        boolean sndReply
+    ) {
         finishJob(res, ex, sndReply, false);
     }
 
@@ -826,11 +844,12 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
      * @param sndReply If {@code true}, reply will be sent.
      * @param retry If {@code true}, retry response will be sent.
      */
-    void finishJob(@Nullable Object res,
+    void finishJob(
+        @Nullable Object res,
         @Nullable IgniteException ex,
         boolean sndReply,
-        boolean retry)
-    {
+        boolean retry
+    ) {
         // Avoid finishing a job more than once from different threads.
         if (!finishing.compareAndSet(false, true))
             return;
@@ -859,6 +878,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                         U.warn(log, "Failed to reply to sender node because it left grid [nodeId=" + taskNode.id() +
                             ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + ']');
 
+                        status = FAILED;
+
                         // Record job reply failure.
                         if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
                             evts = addEvent(evts, EVT_JOB_FAILED, "Job reply failed (task node left grid): " + job);
@@ -873,7 +894,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
 
                             Map<Object, Object> attrs = jobCtx.getAttributes();
 
-                            // Try serialize response, and if exception - return to client.
+                            // Try to serialize response, and if exception - return to client.
                             if (!loc) {
                                 try {
                                     resBytes = U.marshal(marsh, res);
@@ -924,6 +945,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                             }
 
                             if (ex != null) {
+                                status = FAILED;
+
                                 if (isStarted) {
                                     // Job failed.
                                     if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
@@ -934,8 +957,12 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                                     evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started " +
                                         "[ex=" + ex + ", job=" + job + ']');
                             }
-                            else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
-                                evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
+                            else {
+                                status = FINISHED;
+
+                                if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
+                                    evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
+                            }
 
                             GridJobExecuteResponse jobRes = new GridJobExecuteResponse(
                                 ctx.localNodeId(),
@@ -1006,6 +1033,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                 }
                 else {
                     if (ex != null) {
+                        status = FAILED;
+
                         if (isStarted) {
                             if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
                                 evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" + ex +
@@ -1015,13 +1044,21 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                             evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started [ex=" + ex +
                                 ", job=" + job + ']');
                     }
-                    else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
-                        evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
+                    else {
+                        status = FINISHED;
+
+                        if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
+                            evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
+                    }
                 }
             }
-            // Job timed out.
-            else if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
-                evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to timeout: " + job);
+            else {
+                // Job timed out.
+                status = FAILED;
+
+                if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
+                    evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to timeout: " + job);
+            }
         }
         finally {
             if (evts != null) {
@@ -1103,6 +1140,13 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
         return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
     }
 
+    /**
+     * @return Job status.
+     */
+    ComputeJobStatusEnum status() {
+        return status;
+    }
+
     /** {@inheritDoc} */
     @Override public int hashCode() {
         IgniteUuid jobId = ses.getJobId();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
index 3feeb65..a46b731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
@@ -65,6 +65,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Creates task session.
+     *
      * @param sesId Session ID.
      * @param taskNodeId Task node ID.
      * @param taskName Task name.
@@ -79,6 +81,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
      * @param fullSup {@code True} to enable distributed session attributes and checkpoints.
      * @param internal {@code True} in case of internal task.
      * @param execName Custom executor name.
+     * @param login User who created the session, {@code null} if security is not enabled.
      * @return New session if one did not exist, or existing one.
      */
     public GridTaskSessionImpl createTaskSession(
@@ -95,7 +98,9 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
         Map<Object, Object> attrs,
         boolean fullSup,
         boolean internal,
-        @Nullable String execName) {
+        @Nullable String execName,
+        @Nullable Object login
+    ) {
         if (!fullSup) {
             return new GridTaskSessionImpl(
                 taskNodeId,
@@ -112,7 +117,9 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
                 ctx,
                 false,
                 internal,
-                execName);
+                execName,
+                login
+            );
         }
 
         while (true) {
@@ -136,7 +143,10 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
                         ctx,
                         true,
                         internal,
-                        execName));
+                        execName,
+                        login
+                    )
+                );
 
                 if (old != null)
                     ses = old;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskEventListener.java
index 6c4556b..377cf48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskEventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskEventListener.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.task;
 import java.util.EventListener;
 import java.util.UUID;
 import org.apache.ignite.internal.GridJobSiblingImpl;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Listener for task events.
@@ -28,7 +29,14 @@ interface GridTaskEventListener extends EventListener {
     /**
      * @param worker Started grid task worker.
      */
-    public void onTaskStarted(GridTaskWorker<?, ?> worker);
+    void onTaskStarted(GridTaskWorker<?, ?> worker);
+
+    /**
+     * Callback on splitting the task into jobs.
+     *
+     * @param worker Grid task worker.
+     */
+    void onJobsMapped(GridTaskWorker<?, ?> worker);
 
     /**
      * @param worker Grid task worker.
@@ -41,16 +49,19 @@ interface GridTaskEventListener extends EventListener {
      * @param sib Job sibling.
      * @param nodeId Failover node ID.
      */
-    public void onJobFailover(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib, UUID nodeId);
+    void onJobFailover(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib, UUID nodeId);
 
     /**
      * @param worker Grid task worker.
      * @param sib Job sibling.
      */
-    public void onJobFinished(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib);
+    void onJobFinished(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib);
 
     /**
+     * Callback on finish of task execution.
+     *
      * @param worker Task worker for finished grid task.
+     * @param err Reason for the failure of the task, {@code null} if the task completed successfully.
      */
-    public void onTaskFinished(GridTaskWorker<?, ?> worker);
+    void onTaskFinished(GridTaskWorker<?, ?> worker, @Nullable Throwable err);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index da7f6ca..9624af8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
@@ -38,6 +39,7 @@ import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskFuture;
 import org.apache.ignite.compute.ComputeTaskMapAsync;
 import org.apache.ignite.compute.ComputeTaskName;
+import org.apache.ignite.compute.ComputeTaskSession;
 import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
@@ -55,6 +57,7 @@ import org.apache.ignite.internal.GridTaskSessionRequest;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
@@ -65,8 +68,12 @@ import org.apache.ignite.internal.managers.systemview.walker.ComputeTaskViewWalk
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.processors.task.monitor.ComputeGridMonitor;
+import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatus;
+import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusSnapshot;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
@@ -87,6 +94,7 @@ import org.apache.ignite.spi.systemview.view.ComputeTaskView;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.emptyMap;
 import static org.apache.ignite.events.EventType.EVT_MANAGEMENT_TASK_STARTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -160,6 +168,19 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
     private final boolean isPersistenceEnabled;
 
     /**
+     * Task statuses update monitors.
+     * Guarded by {@link #lock}.
+     */
+    private final Collection<ComputeGridMonitor> taskStatusMonitors = ConcurrentHashMap.newKeySet();
+
+    /**
+     * Snapshots of task statuses.
+     * Mapping: {@link ComputeTaskSession#getId} -> task status.
+     * Guarded by {@link #lock}.
+     */
+    private final ConcurrentMap<IgniteUuid, ComputeTaskStatusSnapshot> taskStatusSnapshots = new ConcurrentHashMap<>();
+
+    /**
      * @param ctx Kernal context.
      */
     public GridTaskProcessor(GridKernalContext ctx) {
@@ -758,11 +779,13 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
             topPred,
             startTime,
             endTime,
-            Collections.<ComputeJobSibling>emptyList(),
-            Collections.emptyMap(),
+            Collections.emptyList(),
+            emptyMap(),
             fullSup,
             internal,
-            execName);
+            execName,
+            ctx.security().enabled() ? ctx.security().securityContext().subject().login() : null
+        );
 
         ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx);
 
@@ -1053,6 +1076,8 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
             ctx.event().record(evt);
         }
 
+        notifyTaskStatusMonitors(ComputeTaskStatus.snapshot(ses), false);
+
         IgniteCheckedException ex = null;
 
         // Every job gets an individual message to keep track of ghost requests.
@@ -1274,6 +1299,17 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
             // Register for timeout notifications.
             if (worker.endTime() < Long.MAX_VALUE)
                 ctx.timeout().addTimeoutObject(worker);
+
+            GridTaskSessionImpl session = worker.getSession();
+
+            notifyTaskStatusMonitors(ComputeTaskStatus.snapshot(session), false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onJobsMapped(GridTaskWorker<?, ?> worker) {
+            GridTaskSessionImpl session = worker.getSession();
+
+            notifyTaskStatusMonitors(ComputeTaskStatus.snapshot(session), false);
         }
 
         /** {@inheritDoc} */
@@ -1317,7 +1353,7 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
         }
 
         /** {@inheritDoc} */
-        @Override public void onTaskFinished(GridTaskWorker<?, ?> worker) {
+        @Override public void onTaskFinished(GridTaskWorker<?, ?> worker, @Nullable Throwable err) {
             GridTaskSessionImpl ses = worker.getSession();
 
             if (ses.isFullSupport()) {
@@ -1366,6 +1402,8 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
                     U.currentTimeMillis() - ses.getStartTime(),
                     worker.affPartId());
             }
+
+            notifyTaskStatusMonitors(ComputeTaskStatus.onFinishTask(worker.getSession(), err), true);
         }
     }
 
@@ -1552,4 +1590,97 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
             ? ", task name: " + task.getSession().getTaskName()
             : "";
     }
+
+    /**
+     * Subscription to update the status of tasks.
+     *
+     * <p>NOTE: {@link ComputeGridMonitor#processStatusSnapshots} will be called only on subscription,
+     * then only {@link ComputeGridMonitor#processStatusChange} will be called.
+     *
+     * @param monitor Task status update monitor.
+     * @throws NodeStoppingException If the node is stopped.
+     */
+    public void listenStatusUpdates(ComputeGridMonitor monitor) throws NodeStoppingException {
+        lock.writeLock();
+
+        try {
+            if (stopping)
+                throw new NodeStoppingException("Failed to add monitor due to grid shutdown: " + monitor);
+
+            taskStatusMonitors.add(monitor);
+
+            try {
+                monitor.processStatusSnapshots(taskStatusSnapshots.values());
+            }
+            catch (Throwable t) {
+                log.error("Error processing snapshots of task statuses: " + monitor, t);
+            }
+        }
+        finally {
+            lock.writeUnlock();
+        }
+    }
+
+    /**
+     * Unsubscribe to update the status of tasks.
+     *
+     * @param monitor Task status update monitor.
+     */
+    public void stopListenStatusUpdates(ComputeGridMonitor monitor) {
+        lock.writeLock();
+
+        try {
+            taskStatusMonitors.remove(monitor);
+        }
+        finally {
+            lock.writeUnlock();
+        }
+    }
+
+    /**
+     * Guarded by {@link #lock} for atomic update of the {@link #taskStatusSnapshots}
+     * and notifying {@link #taskStatusMonitors} about task changes.
+     *
+     * @param snapshotChanges Changes to task status.
+     * @param remove {@code True} if it is necessary to remove the {@code snapshotChanges} from
+     *      {@link #taskStatusSnapshots}, otherwise it will be updated.
+     */
+    private void notifyTaskStatusMonitors(ComputeTaskStatusSnapshot snapshotChanges, boolean remove) {
+        lock.readLock();
+
+        try {
+            if (remove)
+                taskStatusSnapshots.remove(snapshotChanges.sessionId());
+            else
+                taskStatusSnapshots.put(snapshotChanges.sessionId(), snapshotChanges);
+
+            for (ComputeGridMonitor monitor : taskStatusMonitors) {
+                try {
+                    monitor.processStatusChange(snapshotChanges);
+                }
+                catch (Throwable t) {
+                    log.error("Error processing task status diff: " + monitor, t);
+                }
+            }
+        }
+        finally {
+            lock.readUnlock();
+        }
+    }
+
+    /**
+     * Collects statistics on jobs locally, only for those jobs that have
+     * already sent a response or are being executed locally.
+     *
+     * @param sesId Task session ID.
+     * @return Job statistics for the task. Mapping: Job status -> count of jobs.
+     */
+    public Map<ComputeJobStatusEnum, Long> jobStatuses(IgniteUuid sesId) {
+        GridTaskWorker<?, ?> taskWorker = tasks.get(sesId);
+
+        if (taskWorker == null)
+            return emptyMap();
+        else
+            return taskWorker.jobStatuses();
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 6cdcd79..4055d6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -67,6 +68,7 @@ import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.closure.AffinityTask;
+import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
 import org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -86,6 +88,8 @@ import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.resources.TaskContinuousMapperResource;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
 import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER;
 import static org.apache.ignite.compute.ComputeJobResultPolicy.WAIT;
 import static org.apache.ignite.events.EventType.EVT_JOB_FAILED_OVER;
@@ -100,6 +104,9 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
 import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FAILED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FINISHED;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_IO_POLICY;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
@@ -158,7 +165,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
     /** */
     private final GridTaskEventListener evtLsnr;
 
-    /** */
+    /** Guarded by {@link #mux}. */
     private Map<IgniteUuid, GridJobResultImpl> jobRes;
 
     /** */
@@ -499,8 +506,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
             ses.setClassLoader(dep.classLoader());
 
             // Nodes are ignored by affinity tasks.
-            final List<ClusterNode> shuffledNodes =
-                affCacheIds == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();
+            final List<ClusterNode> shuffledNodes = affCacheIds == null ? getTaskTopology() : emptyList();
 
             // Load balancer.
             ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);
@@ -513,16 +519,15 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
             // Inject resources.
             ctx.resource().inject(dep, task, ses, balancer, mapper);
 
-            Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(dep.classLoader(),
-                new Callable<Map<? extends ComputeJob, ClusterNode>>() {
-                    @Override public Map<? extends ComputeJob, ClusterNode> call() {
-                        return task.map(shuffledNodes, arg);
-                    }
-                });
+            Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(
+                dep.classLoader(),
+                (Callable<Map<? extends ComputeJob, ClusterNode>>)() -> task.map(shuffledNodes, arg)
+            );
 
-            if (log.isDebugEnabled())
+            if (log.isDebugEnabled()) {
                 log.debug("Mapped task jobs to nodes [jobCnt=" + (mappedJobs != null ? mappedJobs.size() : 0) +
                     ", mappedJobs=" + mappedJobs + ", ses=" + ses + ']');
+            }
 
             if (F.isEmpty(mappedJobs)) {
                 synchronized (mux) {
@@ -634,6 +639,10 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
             }
         }
 
+        ses.jobNodes(F.viewReadOnly(jobs.values(), F.node2id()));
+
+        evtLsnr.onJobsMapped(this);
+
         // Set mapped flag.
         ses.onMapped();
 
@@ -854,7 +863,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
                 List<ComputeJobResult> results;
 
                 if (!resCache)
-                    results = Collections.emptyList();
+                    results = emptyList();
                 else {
                     synchronized (mux) {
                         results = getRemoteResults();
@@ -1640,7 +1649,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
                 recordTaskEvent(EVT_TASK_FAILED, "Task failed.");
 
             // Clean resources prior to finishing future.
-            evtLsnr.onTaskFinished(this);
+            evtLsnr.onTaskFinished(this, e);
 
             if (cancelChildren)
                 cancelChildren();
@@ -1674,6 +1683,57 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
         return affPartId;
     }
 
+    /**
+     * Collects statistics on jobs locally, only for those jobs that have
+     * already sent a response or are being executed locally.
+     *
+     * @return Job statistics for the task. Mapping: Job status -> count of jobs.
+     */
+    Map<ComputeJobStatusEnum, Long> jobStatuses() {
+        List<GridJobResultImpl> jobResults = null;
+
+        synchronized (mux) {
+            if (jobRes != null)
+                jobResults = new ArrayList<>(jobRes.values());
+        }
+
+        // Jobs have not been mapped yet.
+        if (F.isEmpty(jobResults))
+            return emptyMap();
+
+        UUID locNodeId = ctx.localNodeId();
+
+        boolean getLocJobStatistics = false;
+
+        Map<ComputeJobStatusEnum, Long> res = new EnumMap<>(ComputeJobStatusEnum.class);
+
+        for (GridJobResultImpl jobResult : jobResults) {
+            if (jobResult.hasResponse()) {
+                ComputeJobStatusEnum jobStatus;
+
+                if (jobResult.isCancelled())
+                    jobStatus = CANCELLED;
+                else if (jobResult.getException() != null)
+                    jobStatus = FAILED;
+                else
+                    jobStatus = FINISHED;
+
+                res.merge(jobStatus, 1L, Long::sum);
+            }
+            else if (!getLocJobStatistics && locNodeId.equals(jobResult.getNode().id()))
+                getLocJobStatistics = true;
+        }
+
+        if (getLocJobStatistics) {
+            Map<ComputeJobStatusEnum, Long> jobStatuses = ctx.job().jobStatuses(getTaskSessionId());
+
+            for (Map.Entry<ComputeJobStatusEnum, Long> e : jobStatuses.entrySet())
+                res.merge(e.getKey(), e.getValue(), Long::sum);
+        }
+
+        return res;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object obj) {
         if (this == obj)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeGridMonitor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeGridMonitor.java
new file mode 100644
index 0000000..82bce5f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeGridMonitor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task.monitor;
+
+import java.util.Collection;
+
+/**
+ * Monitor for updating task statuses.
+ */
+public interface ComputeGridMonitor {
+    /**
+     * Processing task snapshots.
+     *
+     * @param snapshots Snapshots of tasks.
+     */
+    void processStatusSnapshots(Collection<ComputeTaskStatusSnapshot> snapshots);
+
+    /**
+     * Processing a change in a task.
+     *
+     * @param snapshot Snapshot of the task.
+     */
+    void processStatusChange(ComputeTaskStatusSnapshot snapshot);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatus.java
new file mode 100644
index 0000000..ef4e40f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatus.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task.monitor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridTaskSessionImpl;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.FAILED;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.FINISHED;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.RUNNING;
+
+/**
+ * Task status container.
+ *
+ * @see ComputeTaskStatusSnapshot
+ */
+public class ComputeTaskStatus implements ComputeTaskStatusSnapshot {
+    /** Session ID of the task being executed. */
+    private final IgniteUuid sessionId;
+
+    /** Status of the task. */
+    private final ComputeTaskStatusEnum status;
+
+    /** Task name of the task this session belongs to. */
+    private final String taskName;
+
+    /** ID of the node on which task execution originated. */
+    private final UUID originatingNodeId;
+
+    /** Start of computation time for the task. */
+    private final long startTime;
+
+    /** End of computation time for the task. */
+    private final long endTime;
+
+    /** Nodes IDs on which the task jobs will execute. */
+    private final List<UUID> jobNodes;
+
+    /** All session attributes. */
+    private final Map<?, ?> attributes;
+
+    /** Reason for the failure of the task. */
+    @Nullable private final Throwable failReason;
+
+    /** Availability of changing task attributes. */
+    private final boolean fullSupport;
+
+    /** User who created the task, {@code null} if security is not available. */
+    @Nullable private Object createdBy;
+
+    /** Internal task flag. */
+    private final boolean internal;
+
+    /**
+     * Constructor for a new task.
+     *
+     * @param sessionId Session ID of the task being executed.
+     * @param status Status of the task.
+     * @param taskName Task name of the task this session belongs to.
+     * @param originatingNodeId ID of the node on which task execution originated.
+     * @param startTime Start of computation time for the task.
+     * @param endTime End of computation time for the task.
+     * @param jobNodes Nodes IDs on which the task jobs will execute.
+     * @param attributes All session attributes.
+     * @param failReason Reason for the failure of the task.
+     * @param fullSupport Availability of changing task attributes.
+     * @param createdBy User who created the task, {@code null} if security is not available.
+     * @param internal Internal task flag.
+     */
+    private ComputeTaskStatus(
+        IgniteUuid sessionId,
+        ComputeTaskStatusEnum status,
+        String taskName,
+        UUID originatingNodeId,
+        long startTime,
+        long endTime,
+        List<UUID> jobNodes,
+        Map<?, ?> attributes,
+        @Nullable Throwable failReason,
+        boolean fullSupport,
+        @Nullable Object createdBy,
+        boolean internal
+    ) {
+        this.sessionId = sessionId;
+        this.status = status;
+        this.taskName = taskName;
+        this.originatingNodeId = originatingNodeId;
+        this.startTime = startTime;
+        this.endTime = endTime;
+        this.jobNodes = F.isEmpty(jobNodes) ? emptyList() : jobNodes;
+        this.attributes = F.isEmpty(attributes) ? emptyMap() : attributes;
+        this.failReason = failReason;
+        this.fullSupport = fullSupport;
+        this.createdBy = createdBy;
+        this.internal = internal;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid sessionId() {
+        return sessionId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String taskName() {
+        return taskName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID originatingNodeId() {
+        return originatingNodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long startTime() {
+        return startTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return endTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<UUID> jobNodes() {
+        return jobNodes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<?, ?> attributes() {
+        return attributes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeTaskStatusEnum status() {
+        return status;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable Throwable failReason() {
+        return failReason;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean fullSupport() {
+        return fullSupport;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable Object createBy() {
+        return createdBy;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean internal() {
+        return internal;
+    }
+
+    /**
+     * Creates the status of a task that is in progress.
+     *
+     * @param sessionImp Task session.
+     * @return New instance.
+     */
+    public static ComputeTaskStatus snapshot(GridTaskSessionImpl sessionImp) {
+        return new ComputeTaskStatus(
+            sessionImp.getId(),
+            RUNNING,
+            sessionImp.getTaskName(),
+            sessionImp.getTaskNodeId(),
+            sessionImp.getStartTime(),
+            0L,
+            sessionImp.jobNodesSafeCopy(),
+            sessionImp.attributesSafeCopy(),
+            null,
+            sessionImp.isFullSupport(),
+            sessionImp.login(),
+            sessionImp.isInternal()
+        );
+    }
+
+    /**
+     * Creates a task status on finishing task.
+     *
+     * @param sessionImp Task session.
+     * @param err – Reason for the failure of the task, null if the task completed successfully.
+     * @return New instance.
+     */
+    public static ComputeTaskStatus onFinishTask(GridTaskSessionImpl sessionImp, @Nullable Throwable err) {
+        return new ComputeTaskStatus(
+            sessionImp.getId(),
+            err == null ? FINISHED : FAILED,
+            sessionImp.getTaskName(),
+            sessionImp.getTaskNodeId(),
+            sessionImp.getStartTime(),
+            U.currentTimeMillis(),
+            sessionImp.jobNodesSafeCopy(),
+            sessionImp.attributesSafeCopy(),
+            err,
+            sessionImp.isFullSupport(),
+            sessionImp.login(),
+            sessionImp.isInternal()
+        );
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusEnum.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusEnum.java
new file mode 100644
index 0000000..06faa8b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusEnum.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task.monitor;
+
+/**
+ * Task status.
+ */
+public enum ComputeTaskStatusEnum {
+    /** Task is in progress. */
+    RUNNING,
+
+    /** Task is finished. */
+    FINISHED,
+
+    /** Task has failed. */
+    FAILED;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusSnapshot.java
new file mode 100644
index 0000000..995e3b2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusSnapshot.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task.monitor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Snapshot of the task status.
+ */
+public interface ComputeTaskStatusSnapshot {
+    /**
+     * @return Session ID of the task being executed.
+     */
+    IgniteUuid sessionId();
+
+    /**
+     * @return Task name of the task this session belongs to.
+     */
+    String taskName();
+
+    /**
+     * @return ID of the node on which task execution originated.
+     */
+    UUID originatingNodeId();
+
+    /**
+     * @return Start of computation time for the task.
+     */
+    long startTime();
+
+    /**
+     * @return End of computation time for the task.
+     */
+    long endTime();
+
+    /**
+     * @return Nodes IDs on which the task jobs will execute.
+     */
+    List<UUID> jobNodes();
+
+    /**
+     * @return All session attributes.
+     */
+    Map<?, ?> attributes();
+
+    /**
+     * @return Status of the task.
+     */
+    ComputeTaskStatusEnum status();
+
+    /**
+     * @return Reason for the failure of the task.
+     */
+    @Nullable Throwable failReason();
+
+    /**
+     * @return {@code true} if change of task attributes is available.
+     */
+    boolean fullSupport();
+
+    /**
+     * @return User who created the task, {@code null} if security is not available.
+     */
+    @Nullable Object createBy();
+
+    /**
+     * @return {@code True} if task is internal.
+     */
+    boolean internal();
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
new file mode 100644
index 0000000..ebb5be1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.GridTaskSessionImpl;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.task.monitor.ComputeGridMonitor;
+import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum;
+import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusSnapshot;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.FAILED;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.FINISHED;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.RUNNING;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/**
+ * Test class for {@link ComputeGridMonitor}.
+ */
+public class ComputeGridMonitorTest extends GridCommonAbstractTest {
+    /** Coordinator. */
+    private static IgniteEx CRD;
+
+    /** Compute task status monitor. */
+    private ComputeGridMonitorImpl monitor;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        stopAllGrids();
+
+        IgniteEx crd = startGrids(2);
+
+        crd.cluster().state(ACTIVE);
+
+        awaitPartitionMapExchange();
+
+        CRD = crd;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+
+        CRD = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        CRD.context().task().listenStatusUpdates(monitor = new ComputeGridMonitorImpl());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        CRD.context().task().stopListenStatusUpdates(monitor);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+        return new StopNodeFailureHandler();
+    }
+
+    /**
+     * Checking get of diffs for the successful execution of the task.
+     */
+    @Test
+    public void simpleTest() {
+        ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(new NoopComputeTask(), null);
+
+        taskFut.get(getTestTimeout());
+
+        assertTrue(monitor.statusSnapshots.isEmpty());
+
+        assertEquals(3, monitor.statusChanges.size());
+
+        checkTaskStarted(monitor.statusChanges.poll(), taskFut.getTaskSession());
+        checkTaskMapped(monitor.statusChanges.poll(), taskFut.getTaskSession());
+        checkTaskFinished(monitor.statusChanges.poll(), taskFut.getTaskSession());
+    }
+
+    /**
+     * Checking get of diffs for the failed execution of the task.
+     */
+    @Test
+    public void failTaskTest() {
+        NoopComputeTask task = new NoopComputeTask() {
+            /**
+             * {@inheritDoc}
+             */
+            @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+                throw new IgniteException("FAIL TASK");
+            }
+        };
+
+        ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(task, null);
+
+        assertThrows(log, () -> taskFut.get(getTestTimeout()), IgniteException.class, null);
+
+        assertTrue(monitor.statusSnapshots.isEmpty());
+
+        assertEquals(3, monitor.statusChanges.size());
+
+        checkTaskStarted(monitor.statusChanges.poll(), taskFut.getTaskSession());
+        checkTaskMapped(monitor.statusChanges.poll(), taskFut.getTaskSession());
+        checkTaskFailed(monitor.statusChanges.poll(), taskFut.getTaskSession());
+    }
+
+    /**
+     * Checking get of diffs when changing the task attribute.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void changeAttributesTest() throws Exception {
+        ComputeFullWithWaitTask task = new ComputeFullWithWaitTask(getTestTimeout());
+
+        ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(task, null);
+
+        task.doneOnMapFut.get(getTestTimeout());
+
+        taskFut.getTaskSession().setAttribute("test", "test");
+
+        assertEquals(
+            "test",
+            taskFut.getTaskSession().waitForAttribute("test", getTestTimeout())
+        );
+
+        taskFut.get(getTestTimeout());
+
+        assertTrue(monitor.statusSnapshots.isEmpty());
+
+        assertEquals(4, monitor.statusChanges.size());
+
+        checkTaskStarted(monitor.statusChanges.poll(), taskFut.getTaskSession());
+        checkTaskMapped(monitor.statusChanges.poll(), taskFut.getTaskSession());
+        checkAttributeChanged(monitor.statusChanges.poll(), taskFut.getTaskSession());
+        checkTaskFinished(monitor.statusChanges.poll(), taskFut.getTaskSession());
+    }
+
+    /**
+     * Checking the get of snapshots of task statuses.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void snapshotsTest() throws Exception {
+        ComputeFullWithWaitTask task = new ComputeFullWithWaitTask(getTestTimeout());
+
+        ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(task, null);
+
+        task.doneOnMapFut.get(getTestTimeout());
+
+        ComputeGridMonitorImpl monitor1 = new ComputeGridMonitorImpl();
+
+        try {
+            CRD.context().task().listenStatusUpdates(monitor1);
+
+            assertTrue(monitor.statusSnapshots.isEmpty());
+
+            assertEquals(1, monitor1.statusSnapshots.size());
+
+            checkSnapshot(monitor1.statusSnapshots.poll(), taskFut.getTaskSession());
+        }
+        finally {
+            CRD.context().task().stopListenStatusUpdates(monitor1);
+        }
+
+        taskFut.get(getTestTimeout());
+    }
+
+    /** */
+    private void checkTaskStarted(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+        checkSnapshot(snapshot, (GridTaskSessionImpl)session, RUNNING, false, false);
+    }
+
+    /** */
+    private void checkTaskMapped(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+        checkSnapshot(snapshot, (GridTaskSessionImpl)session, RUNNING, true, false);
+    }
+
+    /** */
+    private void checkAttributeChanged(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+        checkSnapshot(snapshot, (GridTaskSessionImpl)session, RUNNING, true, true);
+    }
+
+    /** */
+    private void checkTaskFinished(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+        checkSnapshot(snapshot, (GridTaskSessionImpl)session, FINISHED, true, true);
+    }
+
+    /** */
+    private void checkTaskFailed(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+        checkSnapshot(snapshot, (GridTaskSessionImpl)session, FAILED, true, true);
+    }
+
+    /** */
+    private void checkSnapshot(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+        checkSnapshot(snapshot, (GridTaskSessionImpl)session, RUNNING, true, true);
+    }
+
+    /** */
+    private void checkSnapshot(
+        ComputeTaskStatusSnapshot snapshot,
+        GridTaskSessionImpl session,
+        ComputeTaskStatusEnum expStatus,
+        boolean checkJobNodes,
+        boolean checkAttributes
+    ) {
+        assertEquals(session.getId(), snapshot.sessionId());
+        assertEquals(expStatus, snapshot.status());
+
+        assertEquals(session.getTaskName(), snapshot.taskName());
+        assertEquals(session.getTaskNodeId(), snapshot.originatingNodeId());
+        assertEquals(session.getStartTime(), snapshot.startTime());
+        assertEquals(session.isFullSupport(), snapshot.fullSupport());
+        assertEquals(session.isInternal(), session.isInternal());
+
+        checkLogin(session, snapshot);
+
+        if (checkJobNodes) {
+            assertEquals(
+                new TreeSet<>(session.getTopology()),
+                new TreeSet<>(snapshot.jobNodes())
+            );
+        }
+        else
+            assertTrue(snapshot.jobNodes().isEmpty());
+
+        if (checkAttributes && session.isFullSupport()) {
+            assertEquals(
+                new TreeMap<>(session.getAttributes()),
+                new TreeMap<>(snapshot.attributes())
+            );
+        }
+
+        if (expStatus == FINISHED) {
+            assertTrue(snapshot.endTime() > 0L);
+            assertNull(snapshot.failReason());
+        }
+        else if (expStatus == FAILED) {
+            assertTrue(snapshot.endTime() > 0L);
+            assertNotNull(snapshot.failReason());
+        }
+        else {
+            assertEquals(0L, snapshot.endTime());
+            assertNull(snapshot.failReason());
+        }
+    }
+
+    /** */
+    private static class ComputeGridMonitorImpl implements ComputeGridMonitor {
+        /** */
+        final Queue<ComputeTaskStatusSnapshot> statusSnapshots = new ConcurrentLinkedQueue<>();
+
+        /** */
+        final Queue<ComputeTaskStatusSnapshot> statusChanges = new ConcurrentLinkedQueue<>();
+
+        /** {@inheritDoc} */
+        @Override public void processStatusSnapshots(Collection<ComputeTaskStatusSnapshot> snapshots) {
+            statusSnapshots.addAll(snapshots);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processStatusChange(ComputeTaskStatusSnapshot snapshot) {
+            statusChanges.add(snapshot);
+        }
+    }
+
+    /** */
+    private static class NoopComputeTask extends ComputeTaskAdapter<Void, Void> {
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            Void arg
+        ) throws IgniteException {
+            return subgrid.stream().collect(toMap(n -> new NoopComputeJob(), identity()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+            return null;
+        }
+    }
+
+    /** */
+    @ComputeTaskSessionFullSupport
+    private static class ComputeFullWithWaitTask extends ComputeTaskAdapter<Void, Void> {
+        /** */
+        final GridFutureAdapter<Void> doneOnMapFut = new GridFutureAdapter<>();
+
+        /** */
+        final long timeout;
+
+        /** */
+        public ComputeFullWithWaitTask(long timeout) {
+            this.timeout = timeout;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            Void arg
+        ) throws IgniteException {
+            doneOnMapFut.onDone();
+
+            return subgrid.stream().collect(toMap(n -> new NoopComputeJob() {
+                /** {@inheritDoc} */
+                @Override public Object execute() throws IgniteException {
+                    try {
+                        U.sleep(500);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+
+                    return super.execute();
+                }
+            }, identity()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+            return null;
+        }
+    }
+
+    /** */
+    private static class NoopComputeJob extends ComputeJobAdapter {
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            return null;
+        }
+    }
+
+    /**
+     * @param session Task session.
+     * @param snapshot Task status snapshot.
+     */
+    protected void checkLogin(GridTaskSessionImpl session, ComputeTaskStatusSnapshot snapshot) {
+        assertNull(session.login());
+        assertNull(snapshot.createBy());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
new file mode 100644
index 0000000..17c783d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.compute;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.collision.GridCollisionManager;
+import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
+import org.apache.ignite.internal.processors.task.GridTaskProcessor;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.CollisionExternalListener;
+import org.apache.ignite.spi.collision.CollisionJobContext;
+import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.emptyMap;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FAILED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FINISHED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.QUEUED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.RUNNING;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.SUSPENDED;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+
+/**
+ * Class for testing {@link GridTaskProcessor#jobStatuses} and {@link GridJobProcessor#jobStatuses}.
+ */
+public class ComputeJobStatusTest extends GridCommonAbstractTest {
+    /** Coordinator. */
+    private static IgniteEx node0;
+
+    /** Second node. */
+    private static IgniteEx node1;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        stopAllGrids();
+
+        IgniteEx crd = startGrids(2);
+
+        crd.cluster().state(ACTIVE);
+
+        awaitPartitionMapExchange();
+
+        node0 = crd;
+        node1 = grid(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+
+        node0 = null;
+        node1 = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        applyAllNodes(PriorityQueueCollisionSpiEx::reset);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setFailureHandler(new StopNodeFailureHandler())
+            .setCollisionSpi(new PriorityQueueCollisionSpiEx())
+            // Disable automatic update of metrics, which can call PriorityQueueCollisionSpi.onCollision
+            // - leads to the activation (execute) of jobs.
+            .setMetricsUpdateFrequency(Long.MAX_VALUE)
+            .setClientFailureDetectionTimeout(Long.MAX_VALUE);
+    }
+
+    /**
+     * Check that there will be no errors if they request statuses for non-existing tasks.
+     */
+    @Test
+    public void testNoStatistics() {
+        IgniteUuid sesId = IgniteUuid.fromUuid(UUID.randomUUID());
+
+        checkTaskJobStatuses(sesId, null, null);
+        checkJobJobStatuses(sesId, null, null);
+    }
+
+    /**
+     * Checks that the statuses of the job will be:
+     * {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} -> {@link ComputeJobStatusEnum#FINISHED}.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testFinishedTasks() throws Exception {
+        checkJobStatuses(FINISHED);
+    }
+
+    /**
+     * Checks that the statuses of the work will be:
+     * {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} -> {@link ComputeJobStatusEnum#FAILED}.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testFailedTasks() throws Exception {
+        checkJobStatuses(FAILED);
+    }
+
+    /**
+     * Checks that the statuses of the work will be:
+     * {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} -> {@link ComputeJobStatusEnum#CANCELLED}.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void tesCancelledTasks() throws Exception {
+        checkJobStatuses(CANCELLED);
+    }
+
+    /**
+     * Checks that the statuses of the work will be:
+     * {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} ->
+     * {@link ComputeJobStatusEnum#SUSPENDED} -> {@link ComputeJobStatusEnum#FINISHED}.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void tesSuspendedTasks() throws Exception {
+        checkJobStatuses(SUSPENDED);
+    }
+
+    /** */
+    private void checkJobStatuses(ComputeJobStatusEnum exp) throws Exception {
+        applyAllNodes(spiEx -> spiEx.waitJobCls = WaitJob.class);
+
+        ComputeTaskFuture<Void> taskFut = node0.compute().executeAsync(
+            new SimpleTask(() -> new WaitJob(getTestTimeout())),
+            null
+        );
+
+        // We are waiting for the jobs (PriorityQueueCollisionSpiEx#waitJobCls == WaitJob.class)
+        // to be received on the nodes to ensure that the correct statistics are obtained.
+        applyAllNodes(spiEx -> spiEx.waitJobFut.get(getTestTimeout()));
+
+        IgniteUuid sesId = taskFut.getTaskSession().getId();
+
+        checkTaskJobStatuses(sesId, QUEUED, null);
+        checkJobJobStatuses(sesId, QUEUED, QUEUED);
+
+        PriorityQueueCollisionSpiEx spiEx0 = PriorityQueueCollisionSpiEx.spiEx(node0);
+        PriorityQueueCollisionSpiEx spiEx1 = PriorityQueueCollisionSpiEx.spiEx(node1);
+
+        WaitJob waitJob0 = spiEx0.task();
+        WaitJob waitJob1 = spiEx1.task();
+
+        // Activating and waiting for a job (WaitJob) to start on node0.
+        spiEx0.handleCollisions();
+        waitJob0.onStartFut.get(getTestTimeout());
+
+        checkTaskJobStatuses(sesId, RUNNING, null);
+        checkJobJobStatuses(sesId, RUNNING, QUEUED);
+
+        // Activating and waiting for a job (WaitJob) to start on node1.
+        spiEx1.handleCollisions();
+        waitJob1.onStartFut.get(getTestTimeout());
+
+        checkTaskJobStatuses(sesId, RUNNING, null);
+        checkJobJobStatuses(sesId, RUNNING, RUNNING);
+
+        switch (exp) {
+            case FINISHED:
+                // Just letting job (WaitJob) finish on node0.
+                waitJob0.waitFut.onDone();
+                break;
+
+            case FAILED:
+                // Finish the job (WaitJob) with an error on node0.
+                waitJob0.waitFut.onDone(new Exception("from test"));
+                break;
+
+            case CANCELLED:
+                // Cancel the job (WaitJob) on node0.
+                node0.context().job().cancelJob(
+                    sesId,
+                    spiEx0.waitJobFut.result().getJobContext().getJobId(),
+                    false
+                );
+                break;
+
+            case SUSPENDED:
+                // Hold the job (WaitJob) with on node0.
+                spiEx0.waitJobFut.result().getJobContext().holdcc();
+                break;
+
+            default:
+                fail("Unknown: " + exp);
+        }
+
+        // Let's wait a bit for the operation (above) to complete.
+        U.sleep(100);
+
+        checkTaskJobStatuses(sesId, exp, null);
+
+        if (exp == SUSPENDED) {
+            // Must resume (unhold) the job (WaitJob) to finish correctly.
+            checkJobJobStatuses(sesId, exp, RUNNING);
+            waitJob0.waitFut.onDone();
+            spiEx0.waitJobFut.result().getJobContext().callcc();
+
+            U.sleep(100);
+
+            checkTaskJobStatuses(sesId, FINISHED, null);
+        }
+
+        // Let's check that the job (WaitJob) on the node0 has finished
+        // and that the statistics about it will be empty (on node0).
+        checkJobJobStatuses(sesId, null, RUNNING);
+
+        // Let's finish the job (WaitJob) on node1.
+        waitJob1.waitFut.onDone();
+
+        taskFut.get(getTestTimeout());
+
+        // After the completion of the task, we will no longer receive statistics about it.
+        checkTaskJobStatuses(sesId, null, null);
+        checkJobJobStatuses(sesId, null, null);
+    }
+
+    /** */
+    private void checkTaskJobStatuses(
+        IgniteUuid sesId,
+        @Nullable ComputeJobStatusEnum expN0,
+        @Nullable ComputeJobStatusEnum expN1
+    ) {
+        Map<ComputeJobStatusEnum, Long> exp0 = expN0 == null ? emptyMap() : F.asMap(expN0, 1L);
+        Map<ComputeJobStatusEnum, Long> exp1 = expN1 == null ? emptyMap() : F.asMap(expN1, 1L);
+
+        assertEqualsMaps(exp0, node0.context().task().jobStatuses(sesId));
+        assertEqualsMaps(exp1, node1.context().task().jobStatuses(sesId));
+    }
+
+    /** */
+    private void checkJobJobStatuses(
+        IgniteUuid sesId,
+        @Nullable ComputeJobStatusEnum expN0,
+        @Nullable ComputeJobStatusEnum expN1
+    ) {
+        Map<ComputeJobStatusEnum, Long> exp0 = expN0 == null ? emptyMap() : F.asMap(expN0, 1L);
+        Map<ComputeJobStatusEnum, Long> exp1 = expN1 == null ? emptyMap() : F.asMap(expN1, 1L);
+
+        assertEqualsMaps(exp0, node0.context().job().jobStatuses(sesId));
+        assertEqualsMaps(exp1, node1.context().job().jobStatuses(sesId));
+    }
+
+    /** */
+    private void applyAllNodes(ConsumerX<PriorityQueueCollisionSpiEx> c) throws Exception {
+        for (Ignite n : G.allGrids())
+            c.accept(PriorityQueueCollisionSpiEx.spiEx(n));
+    }
+
+    /** */
+    private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
+        /** */
+        volatile boolean handleCollision;
+
+        /** */
+        @Nullable volatile Class<? extends ComputeJob> waitJobCls;
+
+        /** */
+        final GridFutureAdapter<CollisionJobContext> waitJobFut = new GridFutureAdapter<>();
+
+        /** {@inheritDoc} */
+        @Override public void onCollision(CollisionContext ctx) {
+            if (!waitJobFut.isDone()) {
+                Class<? extends ComputeJob> waitJobCls = this.waitJobCls;
+
+                if (waitJobCls != null)
+                    ctx.waitingJobs().stream()
+                        .filter(jobCtx -> waitJobCls.isInstance(jobCtx.getJob()))
+                        .findAny()
+                        .ifPresent(waitJobFut::onDone);
+            }
+
+            if (handleCollision)
+                super.onCollision(ctx);
+        }
+
+        /** */
+        void reset() {
+            handleCollision = false;
+
+            waitJobCls = null;
+
+            waitJobFut.reset();
+        }
+
+        /** */
+        void handleCollisions() {
+            handleCollision = true;
+
+            GridCollisionManager collision = ((IgniteEx)ignite).context().collision();
+
+            AtomicReference<CollisionExternalListener> extLsnr = getFieldValue(collision, "extLsnr");
+
+            CollisionExternalListener lsnr = extLsnr.get();
+
+            assertNotNull(lsnr);
+
+            lsnr.onExternalCollision();
+        }
+
+        /** */
+        <T> T task() {
+            return (T)waitJobFut.result().getJob();
+        }
+
+        /** */
+        static PriorityQueueCollisionSpiEx spiEx(Ignite n) {
+            return ((PriorityQueueCollisionSpiEx)n.configuration().getCollisionSpi());
+        }
+    }
+
+    /** */
+    private interface ConsumerX<T> {
+        /** */
+        void accept(T t) throws Exception;
+    }
+
+    /** */
+    private static class SimpleTask extends ComputeTaskAdapter<Void, Void> {
+        /** */
+        final Supplier<? extends ComputeJob> jobFactory;
+
+        /** */
+        private SimpleTask(Supplier<? extends ComputeJob> factory) {
+            jobFactory = factory;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            Void arg
+        ) throws IgniteException {
+            return subgrid.stream().collect(toMap(n -> jobFactory.get(), identity()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(
+            ComputeJobResult res,
+            List<ComputeJobResult> rcvd
+        ) throws IgniteException {
+            return ComputeJobResultPolicy.WAIT;
+        }
+    }
+
+    /** */
+    private static class WaitJob extends ComputeJobAdapter implements Externalizable {
+        /** */
+        GridFutureAdapter<Void> onStartFut;
+
+        /** */
+        GridFutureAdapter<Void> waitFut;
+
+        /** */
+        long waitTimeout;
+
+        /** */
+        public WaitJob() {
+            onStartFut = new GridFutureAdapter<>();
+            waitFut = new GridFutureAdapter<>();
+        }
+
+        /** */
+        private WaitJob(long timeout) {
+            this();
+
+            waitTimeout = timeout;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            onStartFut.onDone();
+
+            try {
+                waitFut.get(waitTimeout);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(waitTimeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            waitTimeout = in.readLong();
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index cf7a573..2e6bcfe 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -1878,6 +1878,22 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
     }
 
     /**
+     * @param exp Expected.
+     * @param act Actual.
+     */
+    protected static void assertEqualsMaps(Map<?, ?> exp, Map<?, ?> act) {
+        if (exp.size() != act.size())
+            fail("Maps are not equal:\nExpected:\t" + exp + "\nActual:\t" + act);
+
+        for (Map.Entry<?, ?> e : exp.entrySet()) {
+            if (!act.containsKey(e.getKey()))
+                fail("Maps are not equal (missing key " + e.getKey() + "):\nExpected:\t" + exp + "\nActual:\t" + act);
+            else if (!F.eq(e.getValue(), act.get(e.getKey())))
+                fail("Maps are not equal (key " + e.getKey() + "):\nExpected:\t" + exp + "\nActual:\t" + act);
+        }
+    }
+
+    /**
      * @param ignite Ignite instance.
      * @param clo Closure.
      * @return Result of closure execution.
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 61cc2a4..61c1899 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -79,6 +79,8 @@ import org.apache.ignite.internal.VisorManagementEventSelfTest;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfTest;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
 import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
+import org.apache.ignite.internal.processors.compute.ComputeGridMonitorTest;
+import org.apache.ignite.internal.processors.compute.ComputeJobStatusTest;
 import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorConfigurationSelfTest;
 import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorSelfTest;
 import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
@@ -172,7 +174,10 @@ import org.junit.runners.Suite;
 
     IgniteComputeJobOneThreadTest.class,
 
-    VisorManagementEventSelfTest.class
+    VisorManagementEventSelfTest.class,
+
+    ComputeGridMonitorTest.class,
+    ComputeJobStatusTest.class
 })
 public class IgniteComputeGridTestSuite {
 }