You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/01/19 13:56:42 UTC
[ignite] branch master updated: IGNITE-16325 Add compute task monitoring to keep track of their current status (#9747)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new abec656 IGNITE-16325 Add compute task monitoring to keep track of their current status (#9747)
abec656 is described below
commit abec656e1bfa12cac7ec9376081003ecb3b05e4d
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Jan 19 16:56:10 2022 +0300
IGNITE-16325 Add compute task monitoring to keep track of their current status (#9747)
---
.../apache/ignite/internal/GridJobResultImpl.java | 16 +-
.../ignite/internal/GridTaskSessionImpl.java | 70 +++-
.../processors/job/ComputeJobStatusEnum.java | 41 ++
.../internal/processors/job/GridJobProcessor.java | 57 ++-
.../internal/processors/job/GridJobWorker.java | 86 +++-
.../session/GridTaskSessionProcessor.java | 16 +-
.../processors/task/GridTaskEventListener.java | 19 +-
.../processors/task/GridTaskProcessor.java | 139 ++++++-
.../internal/processors/task/GridTaskWorker.java | 84 +++-
.../task/monitor/ComputeGridMonitor.java | 39 ++
.../processors/task/monitor/ComputeTaskStatus.java | 227 ++++++++++
.../task/monitor/ComputeTaskStatusEnum.java | 32 ++
.../task/monitor/ComputeTaskStatusSnapshot.java | 89 ++++
.../processors/compute/ComputeGridMonitorTest.java | 390 ++++++++++++++++++
.../processors/compute/ComputeJobStatusTest.java | 457 +++++++++++++++++++++
.../junits/common/GridCommonAbstractTest.java | 16 +
.../testsuites/IgniteComputeGridTestSuite.java | 7 +-
17 files changed, 1711 insertions(+), 74 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobResultImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobResultImpl.java
index 8fbce3f..860b44d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobResultImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobResultImpl.java
@@ -43,19 +43,19 @@ public class GridJobResultImpl implements ComputeJobResult {
/** */
private ClusterNode node;
- /** */
+ /** Guarded by {@code this}. */
private Object data;
- /** */
+ /** Guarded by {@code this}. */
private IgniteException ex;
- /** */
+ /** Guarded by {@code this}. */
private boolean hasRes;
- /** */
+ /** Guarded by {@code this}. */
private boolean isCancelled;
- /** */
+ /** Guarded by {@code this}. */
private boolean isOccupied;
/**
@@ -128,10 +128,12 @@ public class GridJobResultImpl implements ComputeJobResult {
* @param jobAttrs Job attributes.
* @param isCancelled Whether job was cancelled or not.
*/
- public synchronized void onResponse(@Nullable Object data,
+ public synchronized void onResponse(
+ @Nullable Object data,
@Nullable IgniteException ex,
@Nullable Map<Object, Object> jobAttrs,
- boolean isCancelled) {
+ boolean isCancelled
+ ) {
this.data = data;
this.ex = ex;
this.isCancelled = isCancelled;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
index 45a0e94..1088032 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
@@ -43,6 +43,10 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.unmodifiableCollection;
+
/**
* Task session.
*/
@@ -74,7 +78,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
/** */
private Collection<ComputeJobSibling> siblings;
- /** */
+ /** Guarded by {@link #mux}. */
private Map<Object, Object> attrs;
/** */
@@ -120,6 +124,17 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
private final String execName;
/**
+ * Nodes on which the jobs of the task will be executed.
+ * Guarded by {@link #mux}.
+ */
+ @Nullable private List<UUID> jobNodes;
+
+ /** User who created the session, {@code null} if security is not enabled. */
+ @Nullable private final Object login;
+
+ /**
+ * Constructor.
+ *
* @param taskNodeId Task node ID.
* @param taskName Task name.
* @param dep Deployment.
@@ -135,6 +150,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
* @param fullSup Session full support enabled flag.
* @param internal Internal task flag.
* @param execName Custom executor name.
+ * @param login User who created the session, {@code null} if security is not enabled.
*/
public GridTaskSessionImpl(
UUID taskNodeId,
@@ -151,7 +167,9 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
GridKernalContext ctx,
boolean fullSup,
boolean internal,
- @Nullable String execName) {
+ @Nullable String execName,
+ @Nullable Object login
+ ) {
assert taskNodeId != null;
assert taskName != null;
assert sesId != null;
@@ -169,7 +187,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
this.sesId = sesId;
this.startTime = startTime;
this.endTime = endTime;
- this.siblings = siblings != null ? Collections.unmodifiableCollection(siblings) : null;
+ this.siblings = siblings != null ? unmodifiableCollection(siblings) : null;
this.ctx = ctx;
if (attrs != null && !attrs.isEmpty()) {
@@ -183,6 +201,8 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
this.execName = execName;
mapFut = new IgniteFutureImpl(new GridFutureAdapter());
+
+ this.login = login;
}
/** {@inheritDoc} */
@@ -359,7 +379,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
checkFullSupport();
if (keys.isEmpty())
- return Collections.emptyMap();
+ return emptyMap();
if (timeout == 0)
timeout = Long.MAX_VALUE;
@@ -518,7 +538,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
*/
public void setJobSiblings(Collection<ComputeJobSibling> siblings) {
synchronized (mux) {
- this.siblings = Collections.unmodifiableCollection(siblings);
+ this.siblings = unmodifiableCollection(siblings);
}
}
@@ -534,7 +554,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
tmp.addAll(this.siblings);
tmp.addAll(siblings);
- this.siblings = Collections.unmodifiableCollection(tmp);
+ this.siblings = unmodifiableCollection(tmp);
}
}
@@ -606,7 +626,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
checkFullSupport();
synchronized (mux) {
- return attrs == null || attrs.isEmpty() ? Collections.emptyMap() : U.sealMap(attrs);
+ return attrs == null || attrs.isEmpty() ? emptyMap() : U.sealMap(attrs);
}
}
@@ -914,6 +934,42 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
return execName;
}
+ /**
+ * Sets nodes on which the jobs of the task will be executed.
+ *
+ * @param jobNodes Nodes on which the jobs of the task will be executed.
+ */
+ public void jobNodes(Collection<UUID> jobNodes) {
+ synchronized (mux) {
+ this.jobNodes = F.isEmpty(jobNodes) ? emptyList() : new ArrayList<>(jobNodes);
+ }
+ }
+
+ /**
+ * @return Nodes on which the jobs of the task will be executed.
+ */
+ public List<UUID> jobNodesSafeCopy() {
+ synchronized (mux) {
+ return F.isEmpty(jobNodes) ? emptyList() : new ArrayList<>(jobNodes);
+ }
+ }
+
+ /**
+ * @return All session attributes, without checks.
+ */
+ public Map<Object, Object> attributesSafeCopy() {
+ synchronized (mux) {
+ return F.isEmpty(attrs) ? emptyMap() : new HashMap<>(attrs);
+ }
+ }
+
+ /**
+ * @return User who created the session, {@code null} if security is not enabled.
+ */
+ public Object login() {
+ return login;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridTaskSessionImpl.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/ComputeJobStatusEnum.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/ComputeJobStatusEnum.java
new file mode 100644
index 0000000..a700535
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/ComputeJobStatusEnum.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.job;
+
+/**
+ * Compute job status.
+ */
+public enum ComputeJobStatusEnum {
+ /** */
+ QUEUED,
+
+ /** */
+ RUNNING,
+
+ /** */
+ SUSPENDED,
+
+ /** */
+ FAILED,
+
+ /** */
+ CANCELLED,
+
+ /** */
+ FINISHED;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index f47db23..5abe0dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -36,6 +36,8 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDeploymentException;
import org.apache.ignite.IgniteException;
@@ -80,7 +82,6 @@ import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -97,6 +98,8 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.counting;
+import static java.util.stream.Collectors.groupingBy;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JOBS_HISTORY_SIZE;
import static org.apache.ignite.events.EventType.EVT_JOB_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -737,26 +740,18 @@ public class GridJobProcessor extends GridProcessorAdapter {
// Put either job ID or session ID (they are unique).
cancelReqs.putIfAbsent(jobId != null ? jobId : sesId, sys);
- IgnitePredicate<GridJobWorker> idsMatch = new P1<GridJobWorker>() {
- @Override public boolean apply(GridJobWorker e) {
- return sesId != null ?
- jobId != null ?
- e.getSession().getId().equals(sesId) && e.getJobId().equals(jobId) :
- e.getSession().getId().equals(sesId) :
- e.getJobId().equals(jobId);
- }
- };
+ Predicate<GridJobWorker> idsMatch = idMatch(sesId, jobId);
// If we don't have jobId then we have to iterate
if (jobId == null) {
if (!jobAlwaysActivate) {
for (GridJobWorker job : passiveJobs.values()) {
- if (idsMatch.apply(job))
+ if (idsMatch.test(job))
cancelPassiveJob(job);
}
}
for (GridJobWorker job : activeJobs.values()) {
- if (idsMatch.apply(job))
+ if (idsMatch.test(job))
cancelActiveJob(job, sys);
}
}
@@ -764,13 +759,13 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (!jobAlwaysActivate) {
GridJobWorker passiveJob = passiveJobs.get(jobId);
- if (passiveJob != null && idsMatch.apply(passiveJob) && cancelPassiveJob(passiveJob))
+ if (passiveJob != null && idsMatch.test(passiveJob) && cancelPassiveJob(passiveJob))
return;
}
GridJobWorker activeJob = activeJobs.get(jobId);
- if (activeJob != null && idsMatch.apply(activeJob))
+ if (activeJob != null && idsMatch.test(activeJob))
cancelActiveJob(activeJob, sys);
}
}
@@ -1248,7 +1243,9 @@ public class GridJobProcessor extends GridProcessorAdapter {
sesAttrs,
req.isSessionFullSupport(),
req.isInternal(),
- req.executorName());
+ req.executorName(),
+ ctx.security().enabled() ? ctx.security().securityContext().subject().login() : null
+ );
taskSes.setCheckpointSpi(req.getCheckpointSpi());
taskSes.setClassLoader(dep.classLoader());
@@ -2295,4 +2292,34 @@ public class GridJobProcessor extends GridProcessorAdapter {
return sizex();
}
}
+
+ /**
+ * @param sesId Task session ID.
+ * @return Job statistics for the task. Mapping: Job status -> count of jobs.
+ */
+ public Map<ComputeJobStatusEnum, Long> jobStatuses(IgniteUuid sesId) {
+ return Stream.concat(
+ activeJobs.values().stream(),
+ jobAlwaysActivate ? cancelledJobs.values().stream() :
+ Stream.concat(passiveJobs.values().stream(), cancelledJobs.values().stream())
+ )
+ .filter(idMatch(sesId, null))
+ .collect(groupingBy(GridJobWorker::status, counting()));
+ }
+
+ /**
+ * @param sesId Task session ID.
+ * @param jobId Job ID.
+ * @return ID workers predicate.
+ */
+ private Predicate<GridJobWorker> idMatch(@Nullable IgniteUuid sesId, @Nullable IgniteUuid jobId) {
+ assert sesId != null || jobId != null;
+
+ if (sesId == null)
+ return w -> jobId.equals(w.getJobId());
+ else if (jobId == null)
+ return w -> sesId.equals(w.getSession().getId());
+ else
+ return w -> sesId.equals(w.getSession().getId()) && jobId.equals(w.getJobId());
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index fa8d670..cb2a5f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -80,6 +80,12 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FAILED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FINISHED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.QUEUED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.RUNNING;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.SUSPENDED;
/**
* Job worker.
@@ -179,6 +185,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
/** Security context. */
private final SecurityContext secCtx;
+ /** Job status. */
+ private volatile ComputeJobStatusEnum status = QUEUED;
+
/**
* @param ctx Kernal context.
* @param dep Grid deployment.
@@ -442,9 +451,12 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
boolean res;
- if (res = holdLsnr.onHeld(this))
+ if (res = holdLsnr.onHeld(this)) {
held.incrementAndGet();
+ status = SUSPENDED;
+ }
+
return res;
}
@@ -513,6 +525,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
isStarted = true;
+ status = RUNNING;
+
// Event notification.
evtLsnr.onJobStarted(this);
@@ -565,8 +579,10 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
super.cancel();
if (!skipNtf) {
- if (holdLsnr.onUnheld(this))
- held.decrementAndGet();
+ if (holdLsnr.onUnheld(this)) {
+ if (held.decrementAndGet() == 0)
+ status = RUNNING;
+ }
else {
if (log.isDebugEnabled())
log.debug("Ignoring job execution (job was not held).");
@@ -753,11 +769,11 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
if (log.isDebugEnabled())
log.debug("Cancelling job: " + ses);
- U.wrapThreadLoader(dep.classLoader(), new IgniteRunnable() {
- @Override public void run() {
- try (OperationSecurityContext c = ctx.security().withContext(secCtx)) {
- job0.cancel();
- }
+ status = CANCELLED;
+
+ U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> {
+ try (OperationSecurityContext c = ctx.security().withContext(secCtx)) {
+ job0.cancel();
}
});
}
@@ -814,9 +830,11 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
* @param ex Error.
* @param sndReply If {@code true}, reply will be sent.
*/
- void finishJob(@Nullable Object res,
+ void finishJob(
+ @Nullable Object res,
@Nullable IgniteException ex,
- boolean sndReply) {
+ boolean sndReply
+ ) {
finishJob(res, ex, sndReply, false);
}
@@ -826,11 +844,12 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
* @param sndReply If {@code true}, reply will be sent.
* @param retry If {@code true}, retry response will be sent.
*/
- void finishJob(@Nullable Object res,
+ void finishJob(
+ @Nullable Object res,
@Nullable IgniteException ex,
boolean sndReply,
- boolean retry)
- {
+ boolean retry
+ ) {
// Avoid finishing a job more than once from different threads.
if (!finishing.compareAndSet(false, true))
return;
@@ -859,6 +878,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
U.warn(log, "Failed to reply to sender node because it left grid [nodeId=" + taskNode.id() +
", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + ']');
+ status = FAILED;
+
// Record job reply failure.
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Job reply failed (task node left grid): " + job);
@@ -873,7 +894,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
Map<Object, Object> attrs = jobCtx.getAttributes();
- // Try serialize response, and if exception - return to client.
+ // Try to serialize response, and if exception - return to client.
if (!loc) {
try {
resBytes = U.marshal(marsh, res);
@@ -924,6 +945,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
}
if (ex != null) {
+ status = FAILED;
+
if (isStarted) {
// Job failed.
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
@@ -934,8 +957,12 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started " +
"[ex=" + ex + ", job=" + job + ']');
}
- else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
- evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
+ else {
+ status = FINISHED;
+
+ if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
+ evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
+ }
GridJobExecuteResponse jobRes = new GridJobExecuteResponse(
ctx.localNodeId(),
@@ -1006,6 +1033,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
}
else {
if (ex != null) {
+ status = FAILED;
+
if (isStarted) {
if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" + ex +
@@ -1015,13 +1044,21 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started [ex=" + ex +
", job=" + job + ']');
}
- else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
- evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
+ else {
+ status = FINISHED;
+
+ if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED))
+ evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null);
+ }
}
}
- // Job timed out.
- else if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
- evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to timeout: " + job);
+ else {
+ // Job timed out.
+ status = FAILED;
+
+ if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
+ evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to timeout: " + job);
+ }
}
finally {
if (evts != null) {
@@ -1103,6 +1140,13 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
}
+ /**
+ * @return Job status.
+ */
+ ComputeJobStatusEnum status() {
+ return status;
+ }
+
/** {@inheritDoc} */
@Override public int hashCode() {
IgniteUuid jobId = ses.getJobId();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
index 3feeb65..a46b731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
@@ -65,6 +65,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
}
/**
+ * Creates task session.
+ *
* @param sesId Session ID.
* @param taskNodeId Task node ID.
* @param taskName Task name.
@@ -79,6 +81,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
* @param fullSup {@code True} to enable distributed session attributes and checkpoints.
* @param internal {@code True} in case of internal task.
* @param execName Custom executor name.
+ * @param login User who created the session, {@code null} if security is not enabled.
* @return New session if one did not exist, or existing one.
*/
public GridTaskSessionImpl createTaskSession(
@@ -95,7 +98,9 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
Map<Object, Object> attrs,
boolean fullSup,
boolean internal,
- @Nullable String execName) {
+ @Nullable String execName,
+ @Nullable Object login
+ ) {
if (!fullSup) {
return new GridTaskSessionImpl(
taskNodeId,
@@ -112,7 +117,9 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
ctx,
false,
internal,
- execName);
+ execName,
+ login
+ );
}
while (true) {
@@ -136,7 +143,10 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
ctx,
true,
internal,
- execName));
+ execName,
+ login
+ )
+ );
if (old != null)
ses = old;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskEventListener.java
index 6c4556b..377cf48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskEventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskEventListener.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.task;
import java.util.EventListener;
import java.util.UUID;
import org.apache.ignite.internal.GridJobSiblingImpl;
+import org.jetbrains.annotations.Nullable;
/**
* Listener for task events.
@@ -28,7 +29,14 @@ interface GridTaskEventListener extends EventListener {
/**
* @param worker Started grid task worker.
*/
- public void onTaskStarted(GridTaskWorker<?, ?> worker);
+ void onTaskStarted(GridTaskWorker<?, ?> worker);
+
+ /**
+ * Callback on splitting the task into jobs.
+ *
+ * @param worker Grid task worker.
+ */
+ void onJobsMapped(GridTaskWorker<?, ?> worker);
/**
* @param worker Grid task worker.
@@ -41,16 +49,19 @@ interface GridTaskEventListener extends EventListener {
* @param sib Job sibling.
* @param nodeId Failover node ID.
*/
- public void onJobFailover(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib, UUID nodeId);
+ void onJobFailover(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib, UUID nodeId);
/**
* @param worker Grid task worker.
* @param sib Job sibling.
*/
- public void onJobFinished(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib);
+ void onJobFinished(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib);
/**
+ * Callback on finish of task execution.
+ *
* @param worker Task worker for finished grid task.
+ * @param err Reason for the failure of the task, {@code null} if the task completed successfully.
*/
- public void onTaskFinished(GridTaskWorker<?, ?> worker);
+ void onTaskFinished(GridTaskWorker<?, ?> worker, @Nullable Throwable err);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index da7f6ca..9624af8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
@@ -38,6 +39,7 @@ import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskMapAsync;
import org.apache.ignite.compute.ComputeTaskName;
+import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@ -55,6 +57,7 @@ import org.apache.ignite.internal.GridTaskSessionRequest;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
@@ -65,8 +68,12 @@ import org.apache.ignite.internal.managers.systemview.walker.ComputeTaskViewWalk
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.processors.task.monitor.ComputeGridMonitor;
+import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatus;
+import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusSnapshot;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
@@ -87,6 +94,7 @@ import org.apache.ignite.spi.systemview.view.ComputeTaskView;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.emptyMap;
import static org.apache.ignite.events.EventType.EVT_MANAGEMENT_TASK_STARTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -160,6 +168,19 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
private final boolean isPersistenceEnabled;
/**
+ * Task statuses update monitors.
+ * Guarded by {@link #lock}.
+ */
+ private final Collection<ComputeGridMonitor> taskStatusMonitors = ConcurrentHashMap.newKeySet();
+
+ /**
+ * Snapshots of task statuses.
+ * Mapping: {@link ComputeTaskSession#getId} -> task status.
+ * Guarded by {@link #lock}.
+ */
+ private final ConcurrentMap<IgniteUuid, ComputeTaskStatusSnapshot> taskStatusSnapshots = new ConcurrentHashMap<>();
+
+ /**
* @param ctx Kernal context.
*/
public GridTaskProcessor(GridKernalContext ctx) {
@@ -758,11 +779,13 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
topPred,
startTime,
endTime,
- Collections.<ComputeJobSibling>emptyList(),
- Collections.emptyMap(),
+ Collections.emptyList(),
+ emptyMap(),
fullSup,
internal,
- execName);
+ execName,
+ ctx.security().enabled() ? ctx.security().securityContext().subject().login() : null
+ );
ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx);
@@ -1053,6 +1076,8 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
ctx.event().record(evt);
}
+ notifyTaskStatusMonitors(ComputeTaskStatus.snapshot(ses), false);
+
IgniteCheckedException ex = null;
// Every job gets an individual message to keep track of ghost requests.
@@ -1274,6 +1299,17 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
// Register for timeout notifications.
if (worker.endTime() < Long.MAX_VALUE)
ctx.timeout().addTimeoutObject(worker);
+
+ GridTaskSessionImpl session = worker.getSession();
+
+ notifyTaskStatusMonitors(ComputeTaskStatus.snapshot(session), false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onJobsMapped(GridTaskWorker<?, ?> worker) {
+ GridTaskSessionImpl session = worker.getSession();
+
+ notifyTaskStatusMonitors(ComputeTaskStatus.snapshot(session), false);
}
/** {@inheritDoc} */
@@ -1317,7 +1353,7 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
}
/** {@inheritDoc} */
- @Override public void onTaskFinished(GridTaskWorker<?, ?> worker) {
+ @Override public void onTaskFinished(GridTaskWorker<?, ?> worker, @Nullable Throwable err) {
GridTaskSessionImpl ses = worker.getSession();
if (ses.isFullSupport()) {
@@ -1366,6 +1402,8 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
U.currentTimeMillis() - ses.getStartTime(),
worker.affPartId());
}
+
+ notifyTaskStatusMonitors(ComputeTaskStatus.onFinishTask(worker.getSession(), err), true);
}
}
@@ -1552,4 +1590,97 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
? ", task name: " + task.getSession().getTaskName()
: "";
}
+
+ /**
+ * Subscription to update the status of tasks.
+ *
+ * <p>NOTE: {@link ComputeGridMonitor#processStatusSnapshots} will be called only on subscription,
+ * then only {@link ComputeGridMonitor#processStatusChange} will be called.
+ *
+ * @param monitor Task status update monitor.
+ * @throws NodeStoppingException If the node is stopped.
+ */
+ public void listenStatusUpdates(ComputeGridMonitor monitor) throws NodeStoppingException {
+ lock.writeLock();
+
+ try {
+ if (stopping)
+ throw new NodeStoppingException("Failed to add monitor due to grid shutdown: " + monitor);
+
+ taskStatusMonitors.add(monitor);
+
+ try {
+ monitor.processStatusSnapshots(taskStatusSnapshots.values());
+ }
+ catch (Throwable t) {
+ log.error("Error processing snapshots of task statuses: " + monitor, t);
+ }
+ }
+ finally {
+ lock.writeUnlock();
+ }
+ }
+
+ /**
+ * Unsubscribe to update the status of tasks.
+ *
+ * @param monitor Task status update monitor.
+ */
+ public void stopListenStatusUpdates(ComputeGridMonitor monitor) {
+ lock.writeLock();
+
+ try {
+ taskStatusMonitors.remove(monitor);
+ }
+ finally {
+ lock.writeUnlock();
+ }
+ }
+
+ /**
+ * Guarded by {@link #lock} for atomic update of the {@link #taskStatusSnapshots}
+ * and notifying {@link #taskStatusMonitors} about task changes.
+ *
+ * @param snapshotChanges Changes to task status.
+ * @param remove {@code True} if it is necessary to remove the {@code snapshotChanges} from
+ * {@link #taskStatusSnapshots}, otherwise it will be updated.
+ */
+ private void notifyTaskStatusMonitors(ComputeTaskStatusSnapshot snapshotChanges, boolean remove) {
+ lock.readLock();
+
+ try {
+ if (remove)
+ taskStatusSnapshots.remove(snapshotChanges.sessionId());
+ else
+ taskStatusSnapshots.put(snapshotChanges.sessionId(), snapshotChanges);
+
+ for (ComputeGridMonitor monitor : taskStatusMonitors) {
+ try {
+ monitor.processStatusChange(snapshotChanges);
+ }
+ catch (Throwable t) {
+ log.error("Error processing task status diff: " + monitor, t);
+ }
+ }
+ }
+ finally {
+ lock.readUnlock();
+ }
+ }
+
+ /**
+ * Collects statistics on jobs locally, only for those jobs that have
+ * already sent a response or are being executed locally.
+ *
+ * @param sesId Task session ID.
+ * @return Job statistics for the task. Mapping: Job status -> count of jobs.
+ */
+ public Map<ComputeJobStatusEnum, Long> jobStatuses(IgniteUuid sesId) {
+ GridTaskWorker<?, ?> taskWorker = tasks.get(sesId);
+
+ if (taskWorker == null)
+ return emptyMap();
+ else
+ return taskWorker.jobStatuses();
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 6cdcd79..4055d6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -67,6 +68,7 @@ import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.closure.AffinityTask;
+import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
import org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -86,6 +88,8 @@ import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.resources.TaskContinuousMapperResource;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER;
import static org.apache.ignite.compute.ComputeJobResultPolicy.WAIT;
import static org.apache.ignite.events.EventType.EVT_JOB_FAILED_OVER;
@@ -100,6 +104,9 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FAILED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FINISHED;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_IO_POLICY;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
@@ -158,7 +165,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
/** */
private final GridTaskEventListener evtLsnr;
- /** */
+ /** Guarded by {@link #mux}. */
private Map<IgniteUuid, GridJobResultImpl> jobRes;
/** */
@@ -499,8 +506,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
ses.setClassLoader(dep.classLoader());
// Nodes are ignored by affinity tasks.
- final List<ClusterNode> shuffledNodes =
- affCacheIds == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();
+ final List<ClusterNode> shuffledNodes = affCacheIds == null ? getTaskTopology() : emptyList();
// Load balancer.
ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);
@@ -513,16 +519,15 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
// Inject resources.
ctx.resource().inject(dep, task, ses, balancer, mapper);
- Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(dep.classLoader(),
- new Callable<Map<? extends ComputeJob, ClusterNode>>() {
- @Override public Map<? extends ComputeJob, ClusterNode> call() {
- return task.map(shuffledNodes, arg);
- }
- });
+ Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(
+ dep.classLoader(),
+ (Callable<Map<? extends ComputeJob, ClusterNode>>)() -> task.map(shuffledNodes, arg)
+ );
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Mapped task jobs to nodes [jobCnt=" + (mappedJobs != null ? mappedJobs.size() : 0) +
", mappedJobs=" + mappedJobs + ", ses=" + ses + ']');
+ }
if (F.isEmpty(mappedJobs)) {
synchronized (mux) {
@@ -634,6 +639,10 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
}
}
+ ses.jobNodes(F.viewReadOnly(jobs.values(), F.node2id()));
+
+ evtLsnr.onJobsMapped(this);
+
// Set mapped flag.
ses.onMapped();
@@ -854,7 +863,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
List<ComputeJobResult> results;
if (!resCache)
- results = Collections.emptyList();
+ results = emptyList();
else {
synchronized (mux) {
results = getRemoteResults();
@@ -1640,7 +1649,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
recordTaskEvent(EVT_TASK_FAILED, "Task failed.");
// Clean resources prior to finishing future.
- evtLsnr.onTaskFinished(this);
+ evtLsnr.onTaskFinished(this, e);
if (cancelChildren)
cancelChildren();
@@ -1674,6 +1683,57 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec
return affPartId;
}
+ /**
+ * Collects statistics on jobs locally, only for those jobs that have
+ * already sent a response or are being executed locally.
+ *
+ * @return Job statistics for the task. Mapping: Job status -> count of jobs.
+ */
+ Map<ComputeJobStatusEnum, Long> jobStatuses() {
+ List<GridJobResultImpl> jobResults = null;
+
+ synchronized (mux) {
+ if (jobRes != null)
+ jobResults = new ArrayList<>(jobRes.values());
+ }
+
+ // Jobs have not been mapped yet.
+ if (F.isEmpty(jobResults))
+ return emptyMap();
+
+ UUID locNodeId = ctx.localNodeId();
+
+ boolean getLocJobStatistics = false;
+
+ Map<ComputeJobStatusEnum, Long> res = new EnumMap<>(ComputeJobStatusEnum.class);
+
+ for (GridJobResultImpl jobResult : jobResults) {
+ if (jobResult.hasResponse()) {
+ ComputeJobStatusEnum jobStatus;
+
+ if (jobResult.isCancelled())
+ jobStatus = CANCELLED;
+ else if (jobResult.getException() != null)
+ jobStatus = FAILED;
+ else
+ jobStatus = FINISHED;
+
+ res.merge(jobStatus, 1L, Long::sum);
+ }
+ else if (!getLocJobStatistics && locNodeId.equals(jobResult.getNode().id()))
+ getLocJobStatistics = true;
+ }
+
+ if (getLocJobStatistics) {
+ Map<ComputeJobStatusEnum, Long> jobStatuses = ctx.job().jobStatuses(getTaskSessionId());
+
+ for (Map.Entry<ComputeJobStatusEnum, Long> e : jobStatuses.entrySet())
+ res.merge(e.getKey(), e.getValue(), Long::sum);
+ }
+
+ return res;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
if (this == obj)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeGridMonitor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeGridMonitor.java
new file mode 100644
index 0000000..82bce5f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeGridMonitor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task.monitor;
+
+import java.util.Collection;
+
+/**
+ * Monitor for updating task statuses.
+ */
+public interface ComputeGridMonitor {
+ /**
+ * Processing task snapshots.
+ *
+ * @param snapshots Snapshots of tasks.
+ */
+ void processStatusSnapshots(Collection<ComputeTaskStatusSnapshot> snapshots);
+
+ /**
+ * Processing a change in a task.
+ *
+ * @param snapshot Snapshot of the task.
+ */
+ void processStatusChange(ComputeTaskStatusSnapshot snapshot);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatus.java
new file mode 100644
index 0000000..ef4e40f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatus.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task.monitor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridTaskSessionImpl;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.FAILED;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.FINISHED;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.RUNNING;
+
+/**
+ * Task status container.
+ *
+ * @see ComputeTaskStatusSnapshot
+ */
+public class ComputeTaskStatus implements ComputeTaskStatusSnapshot {
+ /** Session ID of the task being executed. */
+ private final IgniteUuid sessionId;
+
+ /** Status of the task. */
+ private final ComputeTaskStatusEnum status;
+
+ /** Task name of the task this session belongs to. */
+ private final String taskName;
+
+ /** ID of the node on which task execution originated. */
+ private final UUID originatingNodeId;
+
+ /** Start of computation time for the task. */
+ private final long startTime;
+
+ /** End of computation time for the task. */
+ private final long endTime;
+
+ /** Nodes IDs on which the task jobs will execute. */
+ private final List<UUID> jobNodes;
+
+ /** All session attributes. */
+ private final Map<?, ?> attributes;
+
+ /** Reason for the failure of the task. */
+ @Nullable private final Throwable failReason;
+
+ /** Availability of changing task attributes. */
+ private final boolean fullSupport;
+
+ /** User who created the task, {@code null} if security is not available. */
+ @Nullable private Object createdBy;
+
+ /** Internal task flag. */
+ private final boolean internal;
+
+ /**
+ * Constructor for a new task.
+ *
+ * @param sessionId Session ID of the task being executed.
+ * @param status Status of the task.
+ * @param taskName Task name of the task this session belongs to.
+ * @param originatingNodeId ID of the node on which task execution originated.
+ * @param startTime Start of computation time for the task.
+ * @param endTime End of computation time for the task.
+ * @param jobNodes Nodes IDs on which the task jobs will execute.
+ * @param attributes All session attributes.
+ * @param failReason Reason for the failure of the task.
+ * @param fullSupport Availability of changing task attributes.
+ * @param createdBy User who created the task, {@code null} if security is not available.
+ * @param internal Internal task flag.
+ */
+ private ComputeTaskStatus(
+ IgniteUuid sessionId,
+ ComputeTaskStatusEnum status,
+ String taskName,
+ UUID originatingNodeId,
+ long startTime,
+ long endTime,
+ List<UUID> jobNodes,
+ Map<?, ?> attributes,
+ @Nullable Throwable failReason,
+ boolean fullSupport,
+ @Nullable Object createdBy,
+ boolean internal
+ ) {
+ this.sessionId = sessionId;
+ this.status = status;
+ this.taskName = taskName;
+ this.originatingNodeId = originatingNodeId;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.jobNodes = F.isEmpty(jobNodes) ? emptyList() : jobNodes;
+ this.attributes = F.isEmpty(attributes) ? emptyMap() : attributes;
+ this.failReason = failReason;
+ this.fullSupport = fullSupport;
+ this.createdBy = createdBy;
+ this.internal = internal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid sessionId() {
+ return sessionId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String taskName() {
+ return taskName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID originatingNodeId() {
+ return originatingNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<UUID> jobNodes() {
+ return jobNodes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<?, ?> attributes() {
+ return attributes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeTaskStatusEnum status() {
+ return status;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Throwable failReason() {
+ return failReason;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean fullSupport() {
+ return fullSupport;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Object createBy() {
+ return createdBy;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean internal() {
+ return internal;
+ }
+
+ /**
+ * Creates the status of a task that is in progress.
+ *
+ * @param sessionImp Task session.
+ * @return New instance.
+ */
+ public static ComputeTaskStatus snapshot(GridTaskSessionImpl sessionImp) {
+ return new ComputeTaskStatus(
+ sessionImp.getId(),
+ RUNNING,
+ sessionImp.getTaskName(),
+ sessionImp.getTaskNodeId(),
+ sessionImp.getStartTime(),
+ 0L,
+ sessionImp.jobNodesSafeCopy(),
+ sessionImp.attributesSafeCopy(),
+ null,
+ sessionImp.isFullSupport(),
+ sessionImp.login(),
+ sessionImp.isInternal()
+ );
+ }
+
+ /**
+ * Creates a task status on finishing task.
+ *
+ * @param sessionImp Task session.
+ * @param err – Reason for the failure of the task, null if the task completed successfully.
+ * @return New instance.
+ */
+ public static ComputeTaskStatus onFinishTask(GridTaskSessionImpl sessionImp, @Nullable Throwable err) {
+ return new ComputeTaskStatus(
+ sessionImp.getId(),
+ err == null ? FINISHED : FAILED,
+ sessionImp.getTaskName(),
+ sessionImp.getTaskNodeId(),
+ sessionImp.getStartTime(),
+ U.currentTimeMillis(),
+ sessionImp.jobNodesSafeCopy(),
+ sessionImp.attributesSafeCopy(),
+ err,
+ sessionImp.isFullSupport(),
+ sessionImp.login(),
+ sessionImp.isInternal()
+ );
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusEnum.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusEnum.java
new file mode 100644
index 0000000..06faa8b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusEnum.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task.monitor;
+
+/**
+ * Task status.
+ */
+public enum ComputeTaskStatusEnum {
+ /** Task is in progress. */
+ RUNNING,
+
+ /** Task is finished. */
+ FINISHED,
+
+ /** Task has failed. */
+ FAILED;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusSnapshot.java
new file mode 100644
index 0000000..995e3b2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/monitor/ComputeTaskStatusSnapshot.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task.monitor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Snapshot of the task status.
+ */
+public interface ComputeTaskStatusSnapshot {
+ /**
+ * @return Session ID of the task being executed.
+ */
+ IgniteUuid sessionId();
+
+ /**
+ * @return Task name of the task this session belongs to.
+ */
+ String taskName();
+
+ /**
+ * @return ID of the node on which task execution originated.
+ */
+ UUID originatingNodeId();
+
+ /**
+ * @return Start of computation time for the task.
+ */
+ long startTime();
+
+ /**
+ * @return End of computation time for the task.
+ */
+ long endTime();
+
+ /**
+ * @return Nodes IDs on which the task jobs will execute.
+ */
+ List<UUID> jobNodes();
+
+ /**
+ * @return All session attributes.
+ */
+ Map<?, ?> attributes();
+
+ /**
+ * @return Status of the task.
+ */
+ ComputeTaskStatusEnum status();
+
+ /**
+ * @return Reason for the failure of the task.
+ */
+ @Nullable Throwable failReason();
+
+ /**
+ * @return {@code true} if change of task attributes is available.
+ */
+ boolean fullSupport();
+
+ /**
+ * @return User who created the task, {@code null} if security is not available.
+ */
+ @Nullable Object createBy();
+
+ /**
+ * @return {@code True} if task is internal.
+ */
+ boolean internal();
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
new file mode 100644
index 0000000..ebb5be1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.GridTaskSessionImpl;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.task.monitor.ComputeGridMonitor;
+import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum;
+import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusSnapshot;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.FAILED;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.FINISHED;
+import static org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum.RUNNING;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/**
+ * Test class for {@link ComputeGridMonitor}.
+ */
+public class ComputeGridMonitorTest extends GridCommonAbstractTest {
+ /** Coordinator. */
+ private static IgniteEx CRD;
+
+ /** Compute task status monitor. */
+ private ComputeGridMonitorImpl monitor;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ stopAllGrids();
+
+ IgniteEx crd = startGrids(2);
+
+ crd.cluster().state(ACTIVE);
+
+ awaitPartitionMapExchange();
+
+ CRD = crd;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+
+ CRD = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ CRD.context().task().listenStatusUpdates(monitor = new ComputeGridMonitorImpl());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ CRD.context().task().stopListenStatusUpdates(monitor);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return new StopNodeFailureHandler();
+ }
+
+ /**
+ * Checking get of diffs for the successful execution of the task.
+ */
+ @Test
+ public void simpleTest() {
+ ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(new NoopComputeTask(), null);
+
+ taskFut.get(getTestTimeout());
+
+ assertTrue(monitor.statusSnapshots.isEmpty());
+
+ assertEquals(3, monitor.statusChanges.size());
+
+ checkTaskStarted(monitor.statusChanges.poll(), taskFut.getTaskSession());
+ checkTaskMapped(monitor.statusChanges.poll(), taskFut.getTaskSession());
+ checkTaskFinished(monitor.statusChanges.poll(), taskFut.getTaskSession());
+ }
+
+ /**
+ * Checking get of diffs for the failed execution of the task.
+ */
+ @Test
+ public void failTaskTest() {
+ NoopComputeTask task = new NoopComputeTask() {
+ /**
+ * {@inheritDoc}
+ */
+ @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+ throw new IgniteException("FAIL TASK");
+ }
+ };
+
+ ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(task, null);
+
+ assertThrows(log, () -> taskFut.get(getTestTimeout()), IgniteException.class, null);
+
+ assertTrue(monitor.statusSnapshots.isEmpty());
+
+ assertEquals(3, monitor.statusChanges.size());
+
+ checkTaskStarted(monitor.statusChanges.poll(), taskFut.getTaskSession());
+ checkTaskMapped(monitor.statusChanges.poll(), taskFut.getTaskSession());
+ checkTaskFailed(monitor.statusChanges.poll(), taskFut.getTaskSession());
+ }
+
+ /**
+ * Checking get of diffs when changing the task attribute.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void changeAttributesTest() throws Exception {
+ ComputeFullWithWaitTask task = new ComputeFullWithWaitTask(getTestTimeout());
+
+ ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(task, null);
+
+ task.doneOnMapFut.get(getTestTimeout());
+
+ taskFut.getTaskSession().setAttribute("test", "test");
+
+ assertEquals(
+ "test",
+ taskFut.getTaskSession().waitForAttribute("test", getTestTimeout())
+ );
+
+ taskFut.get(getTestTimeout());
+
+ assertTrue(monitor.statusSnapshots.isEmpty());
+
+ assertEquals(4, monitor.statusChanges.size());
+
+ checkTaskStarted(monitor.statusChanges.poll(), taskFut.getTaskSession());
+ checkTaskMapped(monitor.statusChanges.poll(), taskFut.getTaskSession());
+ checkAttributeChanged(monitor.statusChanges.poll(), taskFut.getTaskSession());
+ checkTaskFinished(monitor.statusChanges.poll(), taskFut.getTaskSession());
+ }
+
+ /**
+ * Checking the get of snapshots of task statuses.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void snapshotsTest() throws Exception {
+ ComputeFullWithWaitTask task = new ComputeFullWithWaitTask(getTestTimeout());
+
+ ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(task, null);
+
+ task.doneOnMapFut.get(getTestTimeout());
+
+ ComputeGridMonitorImpl monitor1 = new ComputeGridMonitorImpl();
+
+ try {
+ CRD.context().task().listenStatusUpdates(monitor1);
+
+ assertTrue(monitor.statusSnapshots.isEmpty());
+
+ assertEquals(1, monitor1.statusSnapshots.size());
+
+ checkSnapshot(monitor1.statusSnapshots.poll(), taskFut.getTaskSession());
+ }
+ finally {
+ CRD.context().task().stopListenStatusUpdates(monitor1);
+ }
+
+ taskFut.get(getTestTimeout());
+ }
+
+ /** */
+ private void checkTaskStarted(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+ checkSnapshot(snapshot, (GridTaskSessionImpl)session, RUNNING, false, false);
+ }
+
+ /** */
+ private void checkTaskMapped(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+ checkSnapshot(snapshot, (GridTaskSessionImpl)session, RUNNING, true, false);
+ }
+
+ /** */
+ private void checkAttributeChanged(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+ checkSnapshot(snapshot, (GridTaskSessionImpl)session, RUNNING, true, true);
+ }
+
+ /** */
+ private void checkTaskFinished(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+ checkSnapshot(snapshot, (GridTaskSessionImpl)session, FINISHED, true, true);
+ }
+
+ /** */
+ private void checkTaskFailed(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+ checkSnapshot(snapshot, (GridTaskSessionImpl)session, FAILED, true, true);
+ }
+
+ /** */
+ private void checkSnapshot(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) {
+ checkSnapshot(snapshot, (GridTaskSessionImpl)session, RUNNING, true, true);
+ }
+
+ /** */
+ private void checkSnapshot(
+ ComputeTaskStatusSnapshot snapshot,
+ GridTaskSessionImpl session,
+ ComputeTaskStatusEnum expStatus,
+ boolean checkJobNodes,
+ boolean checkAttributes
+ ) {
+ assertEquals(session.getId(), snapshot.sessionId());
+ assertEquals(expStatus, snapshot.status());
+
+ assertEquals(session.getTaskName(), snapshot.taskName());
+ assertEquals(session.getTaskNodeId(), snapshot.originatingNodeId());
+ assertEquals(session.getStartTime(), snapshot.startTime());
+ assertEquals(session.isFullSupport(), snapshot.fullSupport());
+ assertEquals(session.isInternal(), session.isInternal());
+
+ checkLogin(session, snapshot);
+
+ if (checkJobNodes) {
+ assertEquals(
+ new TreeSet<>(session.getTopology()),
+ new TreeSet<>(snapshot.jobNodes())
+ );
+ }
+ else
+ assertTrue(snapshot.jobNodes().isEmpty());
+
+ if (checkAttributes && session.isFullSupport()) {
+ assertEquals(
+ new TreeMap<>(session.getAttributes()),
+ new TreeMap<>(snapshot.attributes())
+ );
+ }
+
+ if (expStatus == FINISHED) {
+ assertTrue(snapshot.endTime() > 0L);
+ assertNull(snapshot.failReason());
+ }
+ else if (expStatus == FAILED) {
+ assertTrue(snapshot.endTime() > 0L);
+ assertNotNull(snapshot.failReason());
+ }
+ else {
+ assertEquals(0L, snapshot.endTime());
+ assertNull(snapshot.failReason());
+ }
+ }
+
+ /** */
+ private static class ComputeGridMonitorImpl implements ComputeGridMonitor {
+ /** */
+ final Queue<ComputeTaskStatusSnapshot> statusSnapshots = new ConcurrentLinkedQueue<>();
+
+ /** */
+ final Queue<ComputeTaskStatusSnapshot> statusChanges = new ConcurrentLinkedQueue<>();
+
+ /** {@inheritDoc} */
+ @Override public void processStatusSnapshots(Collection<ComputeTaskStatusSnapshot> snapshots) {
+ statusSnapshots.addAll(snapshots);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void processStatusChange(ComputeTaskStatusSnapshot snapshot) {
+ statusChanges.add(snapshot);
+ }
+ }
+
+ /** */
+ private static class NoopComputeTask extends ComputeTaskAdapter<Void, Void> {
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(
+ List<ClusterNode> subgrid,
+ Void arg
+ ) throws IgniteException {
+ return subgrid.stream().collect(toMap(n -> new NoopComputeJob(), identity()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+ }
+
+ /** */
+ @ComputeTaskSessionFullSupport
+ private static class ComputeFullWithWaitTask extends ComputeTaskAdapter<Void, Void> {
+ /** */
+ final GridFutureAdapter<Void> doneOnMapFut = new GridFutureAdapter<>();
+
+ /** */
+ final long timeout;
+
+ /** */
+ public ComputeFullWithWaitTask(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(
+ List<ClusterNode> subgrid,
+ Void arg
+ ) throws IgniteException {
+ doneOnMapFut.onDone();
+
+ return subgrid.stream().collect(toMap(n -> new NoopComputeJob() {
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ try {
+ U.sleep(500);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ return super.execute();
+ }
+ }, identity()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+ }
+
+ /** */
+ private static class NoopComputeJob extends ComputeJobAdapter {
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ return null;
+ }
+ }
+
+ /**
+ * @param session Task session.
+ * @param snapshot Task status snapshot.
+ */
+ protected void checkLogin(GridTaskSessionImpl session, ComputeTaskStatusSnapshot snapshot) {
+ assertNull(session.login());
+ assertNull(snapshot.createBy());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
new file mode 100644
index 0000000..17c783d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.compute;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.collision.GridCollisionManager;
+import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
+import org.apache.ignite.internal.processors.task.GridTaskProcessor;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.CollisionExternalListener;
+import org.apache.ignite.spi.collision.CollisionJobContext;
+import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.emptyMap;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FAILED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FINISHED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.QUEUED;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.RUNNING;
+import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.SUSPENDED;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+
+/**
+ * Class for testing {@link GridTaskProcessor#jobStatuses} and {@link GridJobProcessor#jobStatuses}.
+ */
+public class ComputeJobStatusTest extends GridCommonAbstractTest {
+ /** Coordinator. */
+ private static IgniteEx node0;
+
+ /** Second node. */
+ private static IgniteEx node1;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ stopAllGrids();
+
+ IgniteEx crd = startGrids(2);
+
+ crd.cluster().state(ACTIVE);
+
+ awaitPartitionMapExchange();
+
+ node0 = crd;
+ node1 = grid(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+
+ node0 = null;
+ node1 = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ applyAllNodes(PriorityQueueCollisionSpiEx::reset);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setCollisionSpi(new PriorityQueueCollisionSpiEx())
+ // Disable automatic update of metrics, which can call PriorityQueueCollisionSpi.onCollision
+ // - leads to the activation (execute) of jobs.
+ .setMetricsUpdateFrequency(Long.MAX_VALUE)
+ .setClientFailureDetectionTimeout(Long.MAX_VALUE);
+ }
+
+ /**
+ * Check that there will be no errors if they request statuses for non-existing tasks.
+ */
+ @Test
+ public void testNoStatistics() {
+ IgniteUuid sesId = IgniteUuid.fromUuid(UUID.randomUUID());
+
+ checkTaskJobStatuses(sesId, null, null);
+ checkJobJobStatuses(sesId, null, null);
+ }
+
+ /**
+ * Checks that the statuses of the job will be:
+ * {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} -> {@link ComputeJobStatusEnum#FINISHED}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFinishedTasks() throws Exception {
+ checkJobStatuses(FINISHED);
+ }
+
+ /**
+ * Checks that the statuses of the work will be:
+ * {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} -> {@link ComputeJobStatusEnum#FAILED}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailedTasks() throws Exception {
+ checkJobStatuses(FAILED);
+ }
+
+ /**
+ * Checks that the statuses of the work will be:
+ * {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} -> {@link ComputeJobStatusEnum#CANCELLED}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void tesCancelledTasks() throws Exception {
+ checkJobStatuses(CANCELLED);
+ }
+
+ /**
+ * Checks that the statuses of the work will be:
+ * {@link ComputeJobStatusEnum#QUEUED} -> {@link ComputeJobStatusEnum#RUNNING} ->
+ * {@link ComputeJobStatusEnum#SUSPENDED} -> {@link ComputeJobStatusEnum#FINISHED}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void tesSuspendedTasks() throws Exception {
+ checkJobStatuses(SUSPENDED);
+ }
+
+ /** */
+ private void checkJobStatuses(ComputeJobStatusEnum exp) throws Exception {
+ applyAllNodes(spiEx -> spiEx.waitJobCls = WaitJob.class);
+
+ ComputeTaskFuture<Void> taskFut = node0.compute().executeAsync(
+ new SimpleTask(() -> new WaitJob(getTestTimeout())),
+ null
+ );
+
+ // We are waiting for the jobs (PriorityQueueCollisionSpiEx#waitJobCls == WaitJob.class)
+ // to be received on the nodes to ensure that the correct statistics are obtained.
+ applyAllNodes(spiEx -> spiEx.waitJobFut.get(getTestTimeout()));
+
+ IgniteUuid sesId = taskFut.getTaskSession().getId();
+
+ checkTaskJobStatuses(sesId, QUEUED, null);
+ checkJobJobStatuses(sesId, QUEUED, QUEUED);
+
+ PriorityQueueCollisionSpiEx spiEx0 = PriorityQueueCollisionSpiEx.spiEx(node0);
+ PriorityQueueCollisionSpiEx spiEx1 = PriorityQueueCollisionSpiEx.spiEx(node1);
+
+ WaitJob waitJob0 = spiEx0.task();
+ WaitJob waitJob1 = spiEx1.task();
+
+ // Activating and waiting for a job (WaitJob) to start on node0.
+ spiEx0.handleCollisions();
+ waitJob0.onStartFut.get(getTestTimeout());
+
+ checkTaskJobStatuses(sesId, RUNNING, null);
+ checkJobJobStatuses(sesId, RUNNING, QUEUED);
+
+ // Activating and waiting for a job (WaitJob) to start on node1.
+ spiEx1.handleCollisions();
+ waitJob1.onStartFut.get(getTestTimeout());
+
+ checkTaskJobStatuses(sesId, RUNNING, null);
+ checkJobJobStatuses(sesId, RUNNING, RUNNING);
+
+ switch (exp) {
+ case FINISHED:
+ // Just letting job (WaitJob) finish on node0.
+ waitJob0.waitFut.onDone();
+ break;
+
+ case FAILED:
+ // Finish the job (WaitJob) with an error on node0.
+ waitJob0.waitFut.onDone(new Exception("from test"));
+ break;
+
+ case CANCELLED:
+ // Cancel the job (WaitJob) on node0.
+ node0.context().job().cancelJob(
+ sesId,
+ spiEx0.waitJobFut.result().getJobContext().getJobId(),
+ false
+ );
+ break;
+
+ case SUSPENDED:
+ // Hold the job (WaitJob) with on node0.
+ spiEx0.waitJobFut.result().getJobContext().holdcc();
+ break;
+
+ default:
+ fail("Unknown: " + exp);
+ }
+
+ // Let's wait a bit for the operation (above) to complete.
+ U.sleep(100);
+
+ checkTaskJobStatuses(sesId, exp, null);
+
+ if (exp == SUSPENDED) {
+ // Must resume (unhold) the job (WaitJob) to finish correctly.
+ checkJobJobStatuses(sesId, exp, RUNNING);
+ waitJob0.waitFut.onDone();
+ spiEx0.waitJobFut.result().getJobContext().callcc();
+
+ U.sleep(100);
+
+ checkTaskJobStatuses(sesId, FINISHED, null);
+ }
+
+ // Let's check that the job (WaitJob) on the node0 has finished
+ // and that the statistics about it will be empty (on node0).
+ checkJobJobStatuses(sesId, null, RUNNING);
+
+ // Let's finish the job (WaitJob) on node1.
+ waitJob1.waitFut.onDone();
+
+ taskFut.get(getTestTimeout());
+
+ // After the completion of the task, we will no longer receive statistics about it.
+ checkTaskJobStatuses(sesId, null, null);
+ checkJobJobStatuses(sesId, null, null);
+ }
+
+ /** */
+ private void checkTaskJobStatuses(
+ IgniteUuid sesId,
+ @Nullable ComputeJobStatusEnum expN0,
+ @Nullable ComputeJobStatusEnum expN1
+ ) {
+ Map<ComputeJobStatusEnum, Long> exp0 = expN0 == null ? emptyMap() : F.asMap(expN0, 1L);
+ Map<ComputeJobStatusEnum, Long> exp1 = expN1 == null ? emptyMap() : F.asMap(expN1, 1L);
+
+ assertEqualsMaps(exp0, node0.context().task().jobStatuses(sesId));
+ assertEqualsMaps(exp1, node1.context().task().jobStatuses(sesId));
+ }
+
+ /** */
+ private void checkJobJobStatuses(
+ IgniteUuid sesId,
+ @Nullable ComputeJobStatusEnum expN0,
+ @Nullable ComputeJobStatusEnum expN1
+ ) {
+ Map<ComputeJobStatusEnum, Long> exp0 = expN0 == null ? emptyMap() : F.asMap(expN0, 1L);
+ Map<ComputeJobStatusEnum, Long> exp1 = expN1 == null ? emptyMap() : F.asMap(expN1, 1L);
+
+ assertEqualsMaps(exp0, node0.context().job().jobStatuses(sesId));
+ assertEqualsMaps(exp1, node1.context().job().jobStatuses(sesId));
+ }
+
+ /** */
+ private void applyAllNodes(ConsumerX<PriorityQueueCollisionSpiEx> c) throws Exception {
+ for (Ignite n : G.allGrids())
+ c.accept(PriorityQueueCollisionSpiEx.spiEx(n));
+ }
+
+ /** */
+ private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
+ /** */
+ volatile boolean handleCollision;
+
+ /** */
+ @Nullable volatile Class<? extends ComputeJob> waitJobCls;
+
+ /** */
+ final GridFutureAdapter<CollisionJobContext> waitJobFut = new GridFutureAdapter<>();
+
+ /** {@inheritDoc} */
+ @Override public void onCollision(CollisionContext ctx) {
+ if (!waitJobFut.isDone()) {
+ Class<? extends ComputeJob> waitJobCls = this.waitJobCls;
+
+ if (waitJobCls != null)
+ ctx.waitingJobs().stream()
+ .filter(jobCtx -> waitJobCls.isInstance(jobCtx.getJob()))
+ .findAny()
+ .ifPresent(waitJobFut::onDone);
+ }
+
+ if (handleCollision)
+ super.onCollision(ctx);
+ }
+
+ /** */
+ void reset() {
+ handleCollision = false;
+
+ waitJobCls = null;
+
+ waitJobFut.reset();
+ }
+
+ /** */
+ void handleCollisions() {
+ handleCollision = true;
+
+ GridCollisionManager collision = ((IgniteEx)ignite).context().collision();
+
+ AtomicReference<CollisionExternalListener> extLsnr = getFieldValue(collision, "extLsnr");
+
+ CollisionExternalListener lsnr = extLsnr.get();
+
+ assertNotNull(lsnr);
+
+ lsnr.onExternalCollision();
+ }
+
+ /** */
+ <T> T task() {
+ return (T)waitJobFut.result().getJob();
+ }
+
+ /** */
+ static PriorityQueueCollisionSpiEx spiEx(Ignite n) {
+ return ((PriorityQueueCollisionSpiEx)n.configuration().getCollisionSpi());
+ }
+ }
+
+ /** */
+ private interface ConsumerX<T> {
+ /** */
+ void accept(T t) throws Exception;
+ }
+
+ /** */
+ private static class SimpleTask extends ComputeTaskAdapter<Void, Void> {
+ /** */
+ final Supplier<? extends ComputeJob> jobFactory;
+
+ /** */
+ private SimpleTask(Supplier<? extends ComputeJob> factory) {
+ jobFactory = factory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(
+ List<ClusterNode> subgrid,
+ Void arg
+ ) throws IgniteException {
+ return subgrid.stream().collect(toMap(n -> jobFactory.get(), identity()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(
+ ComputeJobResult res,
+ List<ComputeJobResult> rcvd
+ ) throws IgniteException {
+ return ComputeJobResultPolicy.WAIT;
+ }
+ }
+
+ /** */
+ private static class WaitJob extends ComputeJobAdapter implements Externalizable {
+ /** */
+ GridFutureAdapter<Void> onStartFut;
+
+ /** */
+ GridFutureAdapter<Void> waitFut;
+
+ /** */
+ long waitTimeout;
+
+ /** */
+ public WaitJob() {
+ onStartFut = new GridFutureAdapter<>();
+ waitFut = new GridFutureAdapter<>();
+ }
+
+ /** */
+ private WaitJob(long timeout) {
+ this();
+
+ waitTimeout = timeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ onStartFut.onDone();
+
+ try {
+ waitFut.get(waitTimeout);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(waitTimeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ waitTimeout = in.readLong();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index cf7a573..2e6bcfe 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -1878,6 +1878,22 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
}
/**
+ * @param exp Expected.
+ * @param act Actual.
+ */
+ protected static void assertEqualsMaps(Map<?, ?> exp, Map<?, ?> act) {
+ if (exp.size() != act.size())
+ fail("Maps are not equal:\nExpected:\t" + exp + "\nActual:\t" + act);
+
+ for (Map.Entry<?, ?> e : exp.entrySet()) {
+ if (!act.containsKey(e.getKey()))
+ fail("Maps are not equal (missing key " + e.getKey() + "):\nExpected:\t" + exp + "\nActual:\t" + act);
+ else if (!F.eq(e.getValue(), act.get(e.getKey())))
+ fail("Maps are not equal (key " + e.getKey() + "):\nExpected:\t" + exp + "\nActual:\t" + act);
+ }
+ }
+
+ /**
* @param ignite Ignite instance.
* @param clo Closure.
* @return Result of closure execution.
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 61cc2a4..61c1899 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -79,6 +79,8 @@ import org.apache.ignite.internal.VisorManagementEventSelfTest;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfTest;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
+import org.apache.ignite.internal.processors.compute.ComputeGridMonitorTest;
+import org.apache.ignite.internal.processors.compute.ComputeJobStatusTest;
import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorConfigurationSelfTest;
import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorSelfTest;
import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
@@ -172,7 +174,10 @@ import org.junit.runners.Suite;
IgniteComputeJobOneThreadTest.class,
- VisorManagementEventSelfTest.class
+ VisorManagementEventSelfTest.class,
+
+ ComputeGridMonitorTest.class,
+ ComputeJobStatusTest.class
})
public class IgniteComputeGridTestSuite {
}