You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2022/01/21 14:11:26 UTC
[ignite] branch master updated: IGNITE-16339 Dynamic task reprioritization via changing session priority attribute - Fixes #9752.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 02a322e IGNITE-16339 Dynamic task reprioritization via changing session priority attribute - Fixes #9752.
02a322e is described below
commit 02a322e18e1e494bb658c1867e61747aa267f852
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Jan 21 15:51:05 2022 +0300
IGNITE-16339 Dynamic task reprioritization via changing session priority attribute - Fixes #9752.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../ignite/internal/GridTaskSessionImpl.java | 11 +-
.../managers/collision/GridCollisionManager.java | 14 +-
.../processors/job/GridJobEventListener.java | 11 +-
.../internal/processors/job/GridJobProcessor.java | 104 ++++++++-
.../internal/processors/job/GridJobWorker.java | 3 +
.../processors/task/GridTaskProcessor.java | 15 +-
.../priorityqueue/PriorityQueueCollisionSpi.java | 92 +++-----
.../GridMultithreadedJobStealingSelfTest.java | 76 +++---
.../processors/compute/ComputeGridMonitorTest.java | 13 +-
.../compute/ComputeJobChangePriorityTest.java | 259 +++++++++++++++++++++
.../ComputeTaskWithWithoutFullSupportTest.java | 142 +++++++++++
.../internal/processors/compute/NoopJob.java} | 27 +--
.../collision/GridTestCollisionTaskSession.java | 140 +++++------
.../testsuites/IgniteComputeGridTestSuite.java | 6 +-
14 files changed, 707 insertions(+), 206 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
index 1088032..25742b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeTaskSessionAttributeListener;
+import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
import org.apache.ignite.compute.ComputeTaskSessionScope;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -105,7 +106,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
/** */
private final AtomicInteger usage = new AtomicInteger(1);
- /** */
+ /** Task supports session attributes and checkpoints (there is {@link ComputeTaskSessionFullSupport}). */
private final boolean fullSup;
/** */
@@ -211,13 +212,17 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
}
/**
+ * Checks that the task supports session attributes and checkpoints
+ * (there is {@link ComputeTaskSessionFullSupport}).
*
+ * @throws IllegalStateException If not supported.
*/
protected void checkFullSupport() {
- if (!fullSup)
+ if (!fullSup) {
throw new IllegalStateException("Sessions attributes and checkpoints are disabled by default " +
"for better performance (to enable, annotate task class with " +
- "@ComputeTaskSessionFullSupport annotation).");
+ "@" + ComputeTaskSessionFullSupport.class.getSimpleName() + " annotation).");
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java
index 2282e60..0e41221 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/collision/GridCollisionManager.java
@@ -104,27 +104,33 @@ public class GridCollisionManager extends GridManagerAdapter<CollisionSpi> {
}
/**
- * @param waitJobs List of waiting jobs.
- * @param activeJobs List of active jobs.
- * @param heldJobs List of held jobs.
+ * Invoke collision SPI.
+ *
+ * @param waitJobs Collection of waiting jobs.
+ * @param activeJobs Collection of active jobs.
+ * @param heldJobs Collection of held jobs.
*/
public void onCollision(
final Collection<CollisionJobContext> waitJobs,
final Collection<CollisionJobContext> activeJobs,
- final Collection<CollisionJobContext> heldJobs) {
+ final Collection<CollisionJobContext> heldJobs
+ ) {
if (enabled()) {
if (log.isDebugEnabled())
log.debug("Resolving job collisions [waitJobs=" + waitJobs + ", activeJobs=" + activeJobs + ']');
getSpi().onCollision(new CollisionContext() {
+ /** {@inheritDoc} */
@Override public Collection<CollisionJobContext> activeJobs() {
return activeJobs;
}
+ /** {@inheritDoc} */
@Override public Collection<CollisionJobContext> waitingJobs() {
return waitJobs;
}
+ /** {@inheritDoc} */
@Override public Collection<CollisionJobContext> heldJobs() {
return heldJobs;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java
index 3eed487..32af38b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java
@@ -24,17 +24,22 @@ import java.util.EventListener;
*/
interface GridJobEventListener extends EventListener {
/**
+ * @param worker Job worker.
+ */
+ void onJobQueued(GridJobWorker worker);
+
+ /**
* @param worker Started job worker.
*/
- public void onJobStarted(GridJobWorker worker);
+ void onJobStarted(GridJobWorker worker);
/**
* @param worker Job worker.
*/
- public void onBeforeJobResponseSent(GridJobWorker worker);
+ void onBeforeJobResponseSent(GridJobWorker worker);
/**
* @param worker Finished job worker.
*/
- public void onJobFinished(GridJobWorker worker);
+ void onJobFinished(GridJobWorker worker);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 5abe0dc..122b2ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.GridTaskSessionRequest;
import org.apache.ignite.internal.SkipDaemon;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.collision.GridCollisionJobContextAdapter;
+import org.apache.ignite.internal.managers.collision.GridCollisionManager;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -90,6 +91,8 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.spi.collision.CollisionSpi;
+import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
import org.apache.ignite.spi.metric.DoubleMetric;
import org.apache.ignite.spi.systemview.view.ComputeJobView;
import org.apache.ignite.spi.systemview.view.ComputeJobView.ComputeJobState;
@@ -166,7 +169,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
/** */
private final Marshaller marsh;
- /** */
+ /** Collision SPI is not available: {@link GridCollisionManager#enabled()} {@code == false}. */
private final boolean jobAlwaysActivate;
/** */
@@ -298,6 +301,18 @@ public class GridJobProcessor extends GridProcessorAdapter {
private final ThreadLocal<GridJobSessionImpl> currSess = new ThreadLocal<>();
/**
+ * {@link PriorityQueueCollisionSpi#getPriorityAttributeKey} or
+ * {@link null} if the {@link PriorityQueueCollisionSpi} is not configured.
+ */
+ @Nullable private final String taskPriAttrKey;
+
+ /**
+ * {@link PriorityQueueCollisionSpi#getJobPriorityAttributeKey} or
+ * {@link null} if the {@link PriorityQueueCollisionSpi} is not configured.
+ */
+ @Nullable private final String jobPriAttrKey;
+
+ /**
* @param ctx Kernal context.
*/
public GridJobProcessor(GridKernalContext ctx) {
@@ -352,6 +367,17 @@ public class GridJobProcessor extends GridProcessorAdapter {
return new ComputeJobView(e.getKey(), e.getValue(), state);
});
+
+ CollisionSpi collisionSpi = ctx.config().getCollisionSpi();
+
+ if (!jobAlwaysActivate && collisionSpi instanceof PriorityQueueCollisionSpi) {
+ taskPriAttrKey = ((PriorityQueueCollisionSpi)collisionSpi).getPriorityAttributeKey();
+ jobPriAttrKey = ((PriorityQueueCollisionSpi)collisionSpi).getJobPriorityAttributeKey();
+ }
+ else {
+ taskPriAttrKey = null;
+ jobPriAttrKey = null;
+ }
}
/** {@inheritDoc} */
@@ -872,24 +898,29 @@ public class GridJobProcessor extends GridProcessorAdapter {
ctx.collision().onCollision(
// Passive jobs view.
new AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>() {
+ /** {@inheritDoc} */
@NotNull @Override public Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
final Iterator<GridJobWorker> iter = passiveJobs.values().iterator();
return new Iterator<org.apache.ignite.spi.collision.CollisionJobContext>() {
+ /** {@inheritDoc} */
@Override public boolean hasNext() {
return iter.hasNext();
}
+ /** {@inheritDoc} */
@Override public org.apache.ignite.spi.collision.CollisionJobContext next() {
return new CollisionJobContext(iter.next(), true);
}
+ /** {@inheritDoc} */
@Override public void remove() {
throw new UnsupportedOperationException();
}
};
}
+ /** {@inheritDoc} */
@Override public int size() {
return passiveJobs.size();
}
@@ -897,6 +928,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
// Active jobs view.
new AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>() {
+ /** {@inheritDoc} */
@NotNull @Override public Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
final Iterator<GridJobWorker> iter = activeJobs.values().iterator();
@@ -926,10 +958,12 @@ public class GridJobProcessor extends GridProcessorAdapter {
}
}
+ /** {@inheritDoc} */
@Override public boolean hasNext() {
return w != null;
}
+ /** {@inheritDoc} */
@Override public org.apache.ignite.spi.collision.CollisionJobContext next() {
if (w == null)
throw new NoSuchElementException();
@@ -943,21 +977,24 @@ public class GridJobProcessor extends GridProcessorAdapter {
return ret;
}
+ /** {@inheritDoc} */
@Override public void remove() {
throw new UnsupportedOperationException();
}
};
}
+ /** {@inheritDoc} */
@Override public int size() {
int ret = activeJobs.size() - heldJobs.size();
- return ret > 0 ? ret : 0;
+ return Math.max(ret, 0);
}
},
// Held jobs view.
new AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>() {
+ /** {@inheritDoc} */
@NotNull @Override public Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
final Iterator<GridJobWorker> iter = activeJobs.values().iterator();
@@ -987,10 +1024,12 @@ public class GridJobProcessor extends GridProcessorAdapter {
}
}
+ /** {@inheritDoc} */
@Override public boolean hasNext() {
return w != null;
}
+ /** {@inheritDoc} */
@Override public org.apache.ignite.spi.collision.CollisionJobContext next() {
if (w == null)
throw new NoSuchElementException();
@@ -1005,12 +1044,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
return ret;
}
+ /** {@inheritDoc} */
@Override public void remove() {
throw new UnsupportedOperationException();
}
};
}
+ /** {@inheritDoc} */
@Override public int size() {
return heldJobs.size();
}
@@ -1688,6 +1729,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
synchronized (ses) {
ses.setInternal(attrs);
}
+
+ onChangeTaskAttributes(req.getSessionId(), req.getJobId(), attrs);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to deserialize session attributes.", e);
@@ -1944,6 +1987,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
private final GridMessageListener sesLsnr = new JobSessionListener();
/** {@inheritDoc} */
+ @Override public void onJobQueued(GridJobWorker worker) {
+ if (worker.getSession().isFullSupport()) {
+ // Register session request listener for this job.
+ ctx.io().addMessageListener(worker.getJobTopic(), sesLsnr);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void onJobStarted(GridJobWorker worker) {
if (log.isDebugEnabled())
log.debug("Received onJobStarted() callback: " + worker);
@@ -1954,10 +2005,6 @@ public class GridJobProcessor extends GridProcessorAdapter {
// Register for timeout notifications.
if (worker.endTime() < Long.MAX_VALUE)
ctx.timeout().addTimeoutObject(worker);
-
- if (worker.getSession().isFullSupport())
- // Register session request listener for this job.
- ctx.io().addMessageListener(worker.getJobTopic(), sesLsnr);
}
/** {@inheritDoc} */
@@ -2294,6 +2341,51 @@ public class GridJobProcessor extends GridProcessorAdapter {
}
/**
+ * Callback on changing task attributes.
+ *
+ * @param sesId Session ID.
+ * @param jobId Job ID.
+ * @param attrs Changed attributes.
+ */
+ public void onChangeTaskAttributes(IgniteUuid sesId, IgniteUuid jobId, Map<?, ?> attrs) {
+ if (!rwLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Callback on changing the task attributes will be ignored " +
+ "(node is in the process of stopping): " + sesId);
+
+ return;
+ }
+
+ try {
+ if (jobAlwaysActivate || (taskPriAttrKey == null && jobPriAttrKey == null))
+ return;
+
+ GridJobWorker jobWorker = passiveJobs.get(jobId);
+
+ if (jobWorker == null || jobWorker.isInternal())
+ return;
+
+ boolean handleCollisions = false;
+
+ if (taskPriAttrKey != null && attrs.containsKey(taskPriAttrKey)) {
+ // See PriorityQueueCollisionSpi#bumpPriority.
+ jobWorker.getSession().setAttribute(jobPriAttrKey, attrs.get(taskPriAttrKey));
+
+ handleCollisions = true;
+ }
+
+ if (!handleCollisions && jobPriAttrKey != null && attrs.containsKey(jobPriAttrKey))
+ handleCollisions = true;
+
+ if (handleCollisions)
+ handleCollisions();
+ }
+ finally {
+ rwLock.readUnlock();
+ }
+ }
+
+ /**
* @param sesId Task session ID.
* @return Job statistics for the task. Mapping: Job status -> count of jobs.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index cb2a5f4..09ef5ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -490,6 +490,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
// Inject resources.
ctx.resource().inject(dep, taskCls, job, ses, jobCtx);
+ // Event notification.
+ evtLsnr.onJobQueued(this);
+
if (!internal && ctx.event().isRecordable(EVT_JOB_QUEUED))
recordEvent(EVT_JOB_QUEUED, "Job got queued for computation.");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 9624af8..0258114 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1057,7 +1057,7 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
UUID nodeId = sib.nodeId();
- if (!nodeId.equals(locNodeId) && !sib.isJobDone() && !rcvrs.contains(nodeId))
+ if (!nodeId.equals(locNodeId) && !sib.isJobDone())
rcvrs.add(nodeId);
}
}
@@ -1086,8 +1086,12 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
UUID nodeId = sib.nodeId();
- // Pair can be null if job is finished.
- if (rcvrs.remove(nodeId)) {
+ if (locNodeId.equals(nodeId)) {
+ // Local job notification.
+ ctx.job().onChangeTaskAttributes(ses.getId(), s.getJobId(), attrs);
+ }
+ else if (rcvrs.remove(nodeId)) {
+ // Pair can be null if job is finished.
ClusterNode node = ctx.discovery().node(nodeId);
// Check that node didn't change (it could happen in case of failover).
@@ -1096,9 +1100,10 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
GridTaskSessionRequest req = new GridTaskSessionRequest(
ses.getId(),
- null,
+ s.getJobId(),
loc ? null : U.marshal(marsh, attrs),
- attrs);
+ attrs
+ );
// Make sure to go through IO manager always, since order
// should be preserved here.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java
index d4247e4..a40e3c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/priorityqueue/PriorityQueueCollisionSpi.java
@@ -17,7 +17,6 @@
package org.apache.ignite.spi.collision.priorityqueue;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -27,7 +26,7 @@ import java.util.List;
import java.util.Map;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.GridTaskSessionInternal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -239,7 +238,8 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
private volatile boolean preventStarvation = DFLT_PREVENT_STARVATION_ENABLED;
/** Cached priority comparator instance. */
- private Comparator<GridCollisionJobContextWrapper> priComp;
+ private final Comparator<GridCollisionJobContextWrapper> priComp =
+ Comparator.comparing(w -> getJobPriority(w.ctx), Comparator.reverseOrder());
/** */
@LoggerResource
@@ -530,7 +530,7 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
}
}
else {
- Collections.sort(waitSnap, priorityComparator());
+ waitSnap.sort(priComp);
waitSnapSorted = true;
if (preventStarvation)
@@ -551,7 +551,7 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
int skip = waitSnap.size() - waitSize;
if (!waitSnapSorted)
- Collections.sort(waitSnap, priorityComparator());
+ waitSnap.sort(priComp);
int i = 0;
@@ -604,7 +604,8 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
/**
* Gets job priority. At first tries to get from job context. If job context has no priority,
- * then tries to get from task session. If task session has no priority default one will be used.
+ * then tries to get from task session. If task session does not support attributes or there
+ * is no priority in them, then the default priority is used.
*
* @param ctx Collision job context.
* @return Job priority.
@@ -612,43 +613,49 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
private int getJobPriority(CollisionJobContext ctx) {
assert ctx != null;
- Integer p = null;
+ Integer pri = null;
- ComputeJobContext jctx = ctx.getJobContext();
+ ComputeJobContext jobCtx = ctx.getJobContext();
try {
- p = (Integer)jctx.getAttribute(jobPriAttrKey);
+ pri = jobCtx.getAttribute(jobPriAttrKey);
}
catch (ClassCastException e) {
LT.error(log, e, "Type of job context priority attribute '" + jobPriAttrKey +
- "' is not java.lang.Integer [type=" + jctx.getAttribute(jobPriAttrKey).getClass() + ']');
+ "' is not java.lang.Integer [type=" + jobCtx.getAttribute(jobPriAttrKey).getClass() + ']');
}
- if (p == null) {
- ComputeTaskSession ses = ctx.getTaskSession();
+ if (pri == null) {
+ GridTaskSessionInternal taskSes = (GridTaskSessionInternal)ctx.getTaskSession();
- try {
- p = (Integer)ses.getAttribute(taskPriAttrKey);
- }
- catch (ClassCastException e) {
- LT.error(log, e, "Type of task session priority attribute '" + taskPriAttrKey +
- "' is not java.lang.Integer [type=" + ses.getAttribute(taskPriAttrKey).getClass() + ']');
- }
+ if (!taskSes.isFullSupport()) {
+ if (log.isDebugEnabled())
+ log.debug("Task does not support session attributes (will use default priority): " + dfltPri);
- if (p == null) {
- if (log.isDebugEnabled()) {
- log.debug("Failed get priority from job context attribute '" + jobPriAttrKey +
- "' and task session attribute '" + taskPriAttrKey + "' (will use default priority): " +
- dfltPri);
+ pri = dfltPri;
+ }
+ else {
+ try {
+ pri = taskSes.getAttribute(taskPriAttrKey);
+ }
+ catch (ClassCastException e) {
+ LT.error(log, e, "Type of task session priority attribute '" + taskPriAttrKey +
+ "' is not java.lang.Integer [type=" + taskSes.getAttribute(taskPriAttrKey).getClass() + ']');
}
- p = dfltPri;
+ if (pri == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed get priority from job context attribute '" + jobPriAttrKey +
+ "' and task session attribute '" + taskPriAttrKey + "' (will use default priority): " +
+ dfltPri);
+ }
+
+ pri = dfltPri;
+ }
}
}
- assert p != null;
-
- return p;
+ return pri;
}
/** {@inheritDoc} */
@@ -656,19 +663,6 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
return Collections.singletonList(createSpiAttributeName(PRIORITY_ATTRIBUTE_KEY));
}
- /**
- * Returns (possibly shared) comparator fo sorting GridCollisionJobContextWrapper
- * by priority.
- *
- * @return Comparator for priority sorting.
- */
- private Comparator<GridCollisionJobContextWrapper> priorityComparator() {
- if (priComp == null)
- priComp = new PriorityGridCollisionJobContextComparator();
-
- return priComp;
- }
-
/** {@inheritDoc} */
@Override public PriorityQueueCollisionSpi setName(String name) {
super.setName(name);
@@ -682,22 +676,6 @@ public class PriorityQueueCollisionSpi extends IgniteSpiAdapter implements Colli
}
/**
- * Comparator for by priority comparison of collision contexts.
- */
- private class PriorityGridCollisionJobContextComparator implements Comparator<GridCollisionJobContextWrapper>, Serializable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public int compare(GridCollisionJobContextWrapper o1, GridCollisionJobContextWrapper o2) {
- int p1 = getJobPriority(o1.getContext());
- int p2 = getJobPriority(o2.getContext());
-
- return p1 < p2 ? 1 : p1 == p2 ? 0 : -1;
- }
- }
-
- /**
* Wrapper class to keep original collision context position.
*/
private static class GridCollisionJobContextWrapper {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
index fcee6df..be45162 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
@@ -68,14 +68,41 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
ignite = startGridsMultiThreaded(2);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
- ignite = null;
+ super.afterTest();
stopAllGrids();
+
+ ignite = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi();
+
+ // One job at a time.
+ colSpi.setActiveJobsThreshold(1);
+ colSpi.setWaitJobsThreshold(0);
+
+ JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi();
+
+ // Verify defaults.
+ assert failSpi.getMaximumFailoverAttempts() == JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS;
+
+ cfg.setCollisionSpi(colSpi);
+ cfg.setFailoverSpi(failSpi);
+
+ return cfg;
}
/**
@@ -94,7 +121,9 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
int threadsNum = 10;
GridTestUtils.runMultiThreaded(new Runnable() {
- /** */
+ /**
+ *
+ */
@Override public void run() {
try {
JobStealingResult res = ignite.compute().execute(new JobStealingTask(2), null);
@@ -122,7 +151,7 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
// Total jobs number is threadsNum * 2
assertEquals("Incorrect processed jobs number", threadsNum * 2, stolen.get() + noneStolen.get());
- assertFalse( "No jobs were stolen.", stolen.get() == 0);
+ assertFalse("No jobs were stolen.", stolen.get() == 0);
for (Ignite g : G.allGrids())
assertTrue("Node get no jobs.", nodes.contains(g.name()));
@@ -130,7 +159,7 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
// Under these circumstances we should not have more than 2 jobs
// difference.
//(but muted to 4 due to very rare fails and low priority of fix)
- assertTrue( "Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']',
+ assertTrue("Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']',
Math.abs(stolen.get() - noneStolen.get()) <= 4);
}
@@ -154,7 +183,9 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
jobExecutedLatch = new CountDownLatch(threadsNum);
final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new Runnable() {
- /** */
+ /**
+ *
+ */
@Override public void run() {
try {
final IgniteCompute compute = ignite.compute().withAsync();
@@ -190,38 +221,16 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
assertNull("Test failed with exception: ", fail.get());
- // Total jobs number is threadsNum * 3
+ // Total jobs number is threadsNum * 4
assertEquals("Incorrect processed jobs number", threadsNum * jobsPerTask, stolen.get() + noneStolen.get());
- assertFalse( "No jobs were stolen.", stolen.get() == 0);
+ assertFalse("No jobs were stolen.", stolen.get() == 0);
for (Ignite g : G.allGrids())
assertTrue("Node get no jobs.", nodes.contains(g.name()));
- assertTrue( "Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']',
- Math.abs(stolen.get() - 2 * noneStolen.get()) <= 6);
- }
-
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi();
-
- // One job at a time.
- colSpi.setActiveJobsThreshold(1);
- colSpi.setWaitJobsThreshold(0);
-
- JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi();
-
- // Verify defaults.
- assert failSpi.getMaximumFailoverAttempts() == JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS;
-
- cfg.setCollisionSpi(colSpi);
- cfg.setFailoverSpi(failSpi);
-
- return cfg;
+ assertTrue("Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']',
+ Math.abs(stolen.get() - 2 * noneStolen.get()) <= 8);
}
/**
@@ -300,7 +309,10 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
/** {@inheritDoc} */
@Override public Serializable execute() {
try {
- jobExecutedLatch.countDown();
+ CountDownLatch latch = jobExecutedLatch;
+
+ if (latch != null)
+ latch.countDown();
Long sleep = argument(0);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
index ebb5be1..44a06a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.compute.ComputeTaskFuture;
@@ -320,7 +319,7 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest {
List<ClusterNode> subgrid,
Void arg
) throws IgniteException {
- return subgrid.stream().collect(toMap(n -> new NoopComputeJob(), identity()));
+ return subgrid.stream().collect(toMap(n -> new NoopJob(), identity()));
}
/** {@inheritDoc} */
@@ -350,7 +349,7 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest {
) throws IgniteException {
doneOnMapFut.onDone();
- return subgrid.stream().collect(toMap(n -> new NoopComputeJob() {
+ return subgrid.stream().collect(toMap(n -> new NoopJob() {
/** {@inheritDoc} */
@Override public Object execute() throws IgniteException {
try {
@@ -371,14 +370,6 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest {
}
}
- /** */
- private static class NoopComputeJob extends ComputeJobAdapter {
- /** {@inheritDoc} */
- @Override public Object execute() throws IgniteException {
- return null;
- }
- }
-
/**
* @param session Task session.
* @param snapshot Task status snapshot.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java
new file mode 100644
index 0000000..8727164
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.CollisionJobContext;
+import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/**
+ * Class for testing job priority change.
+ */
+public class ComputeJobChangePriorityTest extends GridCommonAbstractTest {
+ /** Coordinator. */
+ private static IgniteEx CRD;
+
+ /** */
+ private static Method ON_CHANGE_TASK_ATTRS_MTD;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ stopAllGrids();
+
+ IgniteEx crd = startGrids(2);
+
+ crd.cluster().state(ACTIVE);
+
+ awaitPartitionMapExchange();
+
+ CRD = crd;
+
+ ON_CHANGE_TASK_ATTRS_MTD = GridJobProcessor.class.getDeclaredMethod(
+ "onChangeTaskAttributes",
+ IgniteUuid.class,
+ IgniteUuid.class,
+ Map.class
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+
+ CRD = null;
+ ON_CHANGE_TASK_ATTRS_MTD = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ for (Ignite n : G.allGrids())
+ PriorityQueueCollisionSpiEx.spiEx(n).reset();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setCollisionSpi(new PriorityQueueCollisionSpiEx())
+ .setMetricsUpdateFrequency(Long.MAX_VALUE)
+ .setClientFailureDetectionTimeout(Long.MAX_VALUE);
+ }
+
+ /**
+ * Checking that when {@link PriorityQueueCollisionSpi#getPriorityAttributeKey} is changed,
+ * collisions will be handled.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testChangeTaskPriorityAttribute() throws Exception {
+ checkChangeAttributes(
+ PriorityQueueCollisionSpiEx.spiEx(CRD).getPriorityAttributeKey(),
+ 1,
+ true
+ );
+ }
+
+ /**
+ * Checking that when {@link PriorityQueueCollisionSpi#getJobPriorityAttributeKey} is changed,
+ * collisions will be handled.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testChangeJobPriorityAttribute() throws Exception {
+ checkChangeAttributes(
+ PriorityQueueCollisionSpiEx.spiEx(CRD).getJobPriorityAttributeKey(),
+ 1,
+ true
+ );
+ }
+
+ /**
+ * Checking that no collision handling will occur when a random attribute is changed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testChangeRandomAttribute() throws Exception {
+ checkChangeAttributes(
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ false
+ );
+ }
+
+ /** */
+ private void checkChangeAttributes(
+ String key,
+ Object val,
+ boolean expHandleCollisionOnChangeTaskAttrs
+ ) throws Exception {
+ ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(new NoopTask(), null);
+
+ for (Ignite n : G.allGrids())
+ PriorityQueueCollisionSpiEx.spiEx(n).waitJobFut.get(getTestTimeout());
+
+ for (Ignite n : G.allGrids())
+ PriorityQueueCollisionSpiEx.spiEx(n).handleCollision = true;
+
+ taskFut.getTaskSession().setAttribute(key, val);
+
+ for (Ignite n : G.allGrids()) {
+ assertEquals(
+ val,
+ PriorityQueueCollisionSpiEx.spiEx(n).waitJobFut.result()
+ .getTaskSession().waitForAttribute(key, getTestTimeout()));
+ }
+
+ for (Ignite n : G.allGrids()) {
+ GridFutureAdapter<Void> fut = PriorityQueueCollisionSpiEx.spiEx(n).onChangeTaskAttrsFut;
+
+ if (expHandleCollisionOnChangeTaskAttrs)
+ fut.get(getTestTimeout());
+ else
+ assertThrows(log, () -> fut.get(100), IgniteFutureTimeoutCheckedException.class, null);
+ }
+
+ if (!expHandleCollisionOnChangeTaskAttrs)
+ CRD.compute().execute(new NoopTask(), null);
+
+ taskFut.get(getTestTimeout());
+ }
+
+ /** */
+ private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
+ /** */
+ volatile boolean handleCollision;
+
+ /** */
+ final GridFutureAdapter<CollisionJobContext> waitJobFut = new GridFutureAdapter<>();
+
+ /** */
+ final GridFutureAdapter<Void> onChangeTaskAttrsFut = new GridFutureAdapter<>();
+
+ /** {@inheritDoc} */
+ @Override public void onCollision(CollisionContext ctx) {
+ if (!waitJobFut.isDone()) {
+ ctx.waitingJobs().stream()
+ .filter(collisionJobCtx -> collisionJobCtx.getJob() instanceof NoopJob)
+ .findAny()
+ .ifPresent(waitJobFut::onDone);
+ }
+
+ if (handleCollision) {
+ if (!onChangeTaskAttrsFut.isDone()) {
+ Stream.of(new Exception().getStackTrace())
+ .filter(el ->
+ ON_CHANGE_TASK_ATTRS_MTD.getDeclaringClass().getName().equals(el.getClassName()) &&
+ ON_CHANGE_TASK_ATTRS_MTD.getName().equals(el.getMethodName())
+ )
+ .findAny()
+ .ifPresent(el -> onChangeTaskAttrsFut.onDone());
+ }
+
+ super.onCollision(ctx);
+ }
+ }
+
+ /** */
+ void reset() {
+ handleCollision = false;
+
+ waitJobFut.reset();
+
+ onChangeTaskAttrsFut.reset();
+ }
+
+ /** */
+ static PriorityQueueCollisionSpiEx spiEx(Ignite n) {
+ return ((PriorityQueueCollisionSpiEx)((IgniteEx)n).context().config().getCollisionSpi());
+ }
+ }
+
+ /** */
+ @ComputeTaskSessionFullSupport
+ private static class NoopTask extends ComputeTaskAdapter<Void, Void> {
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(
+ List<ClusterNode> subgrid,
+ Void arg
+ ) throws IgniteException {
+ return subgrid.stream().collect(toMap(n -> new NoopJob(), identity()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeTaskWithWithoutFullSupportTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeTaskWithWithoutFullSupportTest.java
new file mode 100644
index 0000000..cc39cea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeTaskWithWithoutFullSupportTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.GridTaskSessionInternal;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+
+/**
+ * Class for checking that there will be no errors when starting tasks with/without
+ * {@link ComputeTaskSessionFullSupport}.
+ */
+public class ComputeTaskWithWithoutFullSupportTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setCollisionSpi(new PriorityQueueCollisionSpiEx().setParallelJobsNumber(1))
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setMetricsUpdateFrequency(Long.MAX_VALUE)
+ .setClientFailureDetectionTimeout(Long.MAX_VALUE);
+ }
+
+ /**
+ * Checking that if there is {@link PriorityQueueCollisionSpi},
+ * it is possible to run tasks with and without {@link ComputeTaskSessionFullSupport}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void test() throws Exception {
+ IgniteEx n = startGrid(0);
+
+ n.cluster().state(ACTIVE);
+
+ ComputeTaskFuture<Void> taskFut0 = n.compute().executeAsync(new TaskWithFullSupport(), null);
+ assertTrue(((GridTaskSessionInternal)taskFut0.getTaskSession()).isFullSupport());
+
+ ((PriorityQueueCollisionSpiEx)n.configuration().getCollisionSpi()).handleCollision = true;
+
+ ComputeTaskFuture<Void> taskFut1 = n.compute().executeAsync(new TaskWithoutFullSupport(), null);
+ assertFalse(((GridTaskSessionInternal)taskFut1.getTaskSession()).isFullSupport());
+
+ taskFut0.get(TimeUnit.SECONDS.toMillis(1));
+ taskFut1.get(TimeUnit.SECONDS.toMillis(1));
+ }
+
+ /** */
+ private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
+ /** */
+ volatile boolean handleCollision;
+
+ /** {@inheritDoc} */
+ @Override public void onCollision(CollisionContext ctx) {
+ if (handleCollision)
+ super.onCollision(ctx);
+ }
+ }
+
+ /** */
+ @ComputeTaskSessionFullSupport
+ private static class TaskWithFullSupport extends ComputeTaskAdapter<Void, Void> {
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(
+ List<ClusterNode> subgrid,
+ Void arg
+ ) throws IgniteException {
+ assertFalse(subgrid.isEmpty());
+
+ return singletonMap(new NoopJob(), subgrid.get(0));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+ }
+
+ /** */
+ private static class TaskWithoutFullSupport extends ComputeTaskAdapter<Void, Void> {
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(
+ List<ClusterNode> subgrid,
+ Void arg
+ ) throws IgniteException {
+ assertFalse(subgrid.isEmpty());
+
+ return singletonMap(new NoopJob(), subgrid.get(0));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/NoopJob.java
similarity index 62%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java
copy to modules/core/src/test/java/org/apache/ignite/internal/processors/compute/NoopJob.java
index 3eed487..48f8bc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobEventListener.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/NoopJob.java
@@ -15,26 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.job;
+package org.apache.ignite.internal.processors.compute;
-import java.util.EventListener;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobAdapter;
/**
- * Job event listener.
+ *
*/
-interface GridJobEventListener extends EventListener {
- /**
- * @param worker Started job worker.
- */
- public void onJobStarted(GridJobWorker worker);
-
- /**
- * @param worker Job worker.
- */
- public void onBeforeJobResponseSent(GridJobWorker worker);
-
- /**
- * @param worker Finished job worker.
- */
- public void onJobFinished(GridJobWorker worker);
+class NoopJob extends ComputeJobAdapter {
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ return null;
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java b/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
index 3a7d757..47c00d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
@@ -21,28 +21,33 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.compute.ComputeJobSibling;
-import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSessionAttributeListener;
import org.apache.ignite.compute.ComputeTaskSessionScope;
+import org.apache.ignite.internal.GridTaskSessionInternal;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
/**
* Test collision task session.
*/
-public class GridTestCollisionTaskSession implements ComputeTaskSession {
- /** */
+public class GridTestCollisionTaskSession implements GridTaskSessionInternal {
+ /** Task priority. */
private Integer pri = 0;
- /** */
+ /** Task priority attribute key. */
private String priAttrKey;
- /** */
+ /**
+ * Default constructor.
+ */
public GridTestCollisionTaskSession() {
// No-op.
}
/**
+ * Constructor.
+ *
* @param pri Priority.
* @param priAttrKey Priority attribute key.
*/
@@ -55,81 +60,68 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
/** {@inheritDoc} */
@Override public UUID getTaskNodeId() {
- assert false;
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public <K, V> V waitForAttribute(K key, long timeout) {
- assert false : "Not implemented";
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public boolean waitForAttribute(Object key, Object val, long timeout) throws InterruptedException {
- assert false : "Not implemented";
-
- return false;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public Map<?, ?> waitForAttributes(Collection<?> keys, long timeout) {
- assert false : "Not implemented";
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
- @Override public boolean waitForAttributes(Map<?, ?> attrs, long timeout) throws InterruptedException {
- assert false : "Not implemented";
-
- return false;
+ @Override public boolean waitForAttributes(Map<?, ?> attrs, long timeout) {
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public void saveCheckpoint(String key, Object state) {
- assert false : "Not implemented";
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public void saveCheckpoint(String key, Object state, ComputeTaskSessionScope scope, long timeout) {
- assert false : "Not implemented";
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
- @Override public void saveCheckpoint(String key, Object state, ComputeTaskSessionScope scope, long timeout,
- boolean overwrite) {
- assert false : "Not implemented";
+ @Override public void saveCheckpoint(
+ String key,
+ Object state,
+ ComputeTaskSessionScope scope,
+ long timeout,
+ boolean overwrite
+ ) {
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public <T> T loadCheckpoint(String key) {
- assert false : "Not implemented";
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public boolean removeCheckpoint(String key) {
- assert false : "Not implemented";
-
- return false;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public String getTaskName() {
- assert false : "Not implemented";
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public IgniteUuid getId() {
- assert false : "Not implemented";
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@@ -139,16 +131,12 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
/** {@inheritDoc} */
@Override public ClassLoader getClassLoader() {
- assert false : "Not implemented";
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public Collection<ComputeJobSibling> getJobSiblings() {
- assert false : "Not implemented";
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@@ -158,14 +146,12 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
/** {@inheritDoc} */
@Override public ComputeJobSibling getJobSibling(IgniteUuid jobId) {
- assert false : "Not implemented";
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public void setAttribute(Object key, Object val) {
- assert false : "Not implemented";
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@@ -178,33 +164,27 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
/** {@inheritDoc} */
@Override public void setAttributes(Map<?, ?> attrs) {
- assert false : "Not implemented";
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public Map<Object, Object> getAttributes() {
- assert false : "Not implemented";
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public void addAttributeListener(ComputeTaskSessionAttributeListener lsnr, boolean rewind) {
- assert false : "Not implemented";
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public boolean removeAttributeListener(ComputeTaskSessionAttributeListener lsnr) {
- assert false : "Not implemented";
-
- return false;
+ throw new UnsupportedOperationException("Not implemented");
}
/** {@inheritDoc} */
@Override public IgniteFuture<?> mapFuture() {
- assert false : "Not implemented";
-
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
/**
@@ -239,14 +219,42 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
}
/** {@inheritDoc} */
- @Override public String toString() {
- StringBuilder buf = new StringBuilder();
+ @Override public String getCheckpointSpi() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable IgniteUuid getJobId() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isTaskNode() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClosed() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClosed() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridTaskSessionInternal session() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
- buf.append(getClass().getName());
- buf.append(" [priority=").append(pri);
- buf.append(", priorityAttrKey='").append(priAttrKey).append('\'');
- buf.append(']');
+ /** {@inheritDoc} */
+ @Override public boolean isFullSupport() {
+ return true;
+ }
- return buf.toString();
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return getClass().getName() + " [priority=" + pri + ", priorityAttrKey='" + priAttrKey + '\'' + ']';
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 61c1899..8ad91a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -80,7 +80,9 @@ import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfT
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
import org.apache.ignite.internal.processors.compute.ComputeGridMonitorTest;
+import org.apache.ignite.internal.processors.compute.ComputeJobChangePriorityTest;
import org.apache.ignite.internal.processors.compute.ComputeJobStatusTest;
+import org.apache.ignite.internal.processors.compute.ComputeTaskWithWithoutFullSupportTest;
import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorConfigurationSelfTest;
import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorSelfTest;
import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
@@ -177,7 +179,9 @@ import org.junit.runners.Suite;
VisorManagementEventSelfTest.class,
ComputeGridMonitorTest.class,
- ComputeJobStatusTest.class
+ ComputeJobChangePriorityTest.class,
+ ComputeJobStatusTest.class,
+ ComputeTaskWithWithoutFullSupportTest.class
})
public class IgniteComputeGridTestSuite {
}