You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2022/05/12 11:34:24 UTC
[ignite] branch master updated: IGNITE-16916 Graceful cancel of job workers instead of brutal interrupt - Fixes #10015.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9cbaf6c371d IGNITE-16916 Graceful cancel of job workers instead of brutal interrupt - Fixes #10015.
9cbaf6c371d is described below
commit 9cbaf6c371dd489b2243d1737b0bd325333af79d
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu May 12 14:26:22 2022 +0300
IGNITE-16916 Graceful cancel of job workers instead of brutal interrupt - Fixes #10015.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
modules/control-utility/pom.xml | 14 +
.../ignite/util/KillCommandsCommandShTest.java | 3 +
.../managers/discovery/GridDiscoveryManager.java | 4 +-
.../cache/GridCacheSharedTtlCleanupManager.java | 12 +-
.../cache/binary/BinaryMetadataFileStore.java | 2 +-
.../cache/persistence/checkpoint/Checkpointer.java | 8 +-
.../persistence/wal/FileWriteAheadLogManager.java | 22 +-
.../wal/filehandle/FileHandleManagerImpl.java | 6 +-
.../DistributedConfigurationProcessor.java | 5 +-
.../internal/processors/job/GridJobProcessor.java | 36 +-
.../internal/processors/job/GridJobWorker.java | 80 +++-
.../job/JobWorkerInterruptionTimeoutObject.java | 87 +++++
.../persistence/DmsDataWriterWorker.java | 4 +-
.../processors/timeout/GridTimeoutObject.java | 6 +-
.../processors/timeout/GridTimeoutProcessor.java | 2 +-
.../ignite/internal/util/StripedExecutor.java | 2 +-
.../ignite/internal/util/nio/GridNioServer.java | 2 +-
.../ignite/internal/util/worker/GridWorker.java | 54 ++-
.../internal/GridCancelOnGridStopSelfTest.java | 5 +-
.../internal/GridCancelUnusedJobSelfTest.java | 30 +-
.../internal/GridCancelledJobsMetricsSelfTest.java | 32 +-
.../GridFailoverCustomTopologySelfTest.java | 27 +-
.../GridMultithreadedJobStealingSelfTest.java | 3 +
.../internal/GridStopWithCancelSelfTest.java | 3 +
.../GridTaskFutureImplStopGridSelfTest.java | 19 +-
.../internal/metric/SystemViewComputeJobTest.java | 3 +
.../processors/compute/ComputeJobStatusTest.java | 6 +
.../compute/InterruptComputeJobTest.java | 416 +++++++++++++++++++++
...ridSessionCancelSiblingsFromFutureSelfTest.java | 60 +--
.../GridSessionCancelSiblingsFromJobSelfTest.java | 62 +--
.../GridSessionCancelSiblingsFromTaskSelfTest.java | 64 ++--
.../apache/ignite/testframework/GridTestUtils.java | 45 ++-
.../testframework/junits/GridAbstractTest.java | 2 +-
.../junits/common/GridCommonAbstractTest.java | 18 +
.../junits/spi/GridSpiAbstractTest.java | 4 +-
.../testsuites/IgniteComputeGridTestSuite.java | 4 +-
.../apache/ignite/util/KillCommandsMXBeanTest.java | 3 +
.../apache/ignite/util/KillCommandsSQLTest.java | 3 +
38 files changed, 949 insertions(+), 209 deletions(-)
diff --git a/modules/control-utility/pom.xml b/modules/control-utility/pom.xml
index 06163ce532a..033e2e7b178 100644
--- a/modules/control-utility/pom.xml
+++ b/modules/control-utility/pom.xml
@@ -130,6 +130,20 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
index 7c844ad9e49..017c2d2cf13 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
@@ -81,6 +81,9 @@ public class KillCommandsCommandShTest extends GridCommandHandlerClusterByClassA
cache.put(i, i);
awaitPartitionMapExchange();
+
+ // We change to reduce the waiting time for interrupting compute job.
+ computeJobWorkerInterruptTimeout(srvs.get(0)).propagate(100L);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 72f57af2628..f87beacf739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2898,7 +2898,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (!isInterruptedException)
U.error(log, "Exception in discovery notifier worker thread.", t);
- if (!isInterruptedException || !isCancelled) {
+ if (!isInterruptedException || !isCancelled.get()) {
FailureType type = t instanceof OutOfMemoryError ? CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION;
ctx.failure().process(new FailureContext(type, t));
@@ -3078,7 +3078,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
onIdle();
}
catch (InterruptedException e) {
- if (!isCancelled)
+ if (!isCancelled.get())
ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, e));
throw e;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
index da2b793e242..dc3aa396e8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
@@ -164,6 +164,12 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
cctx.exchange().affinityReadyFuture(AffinityTopologyVersion.ZERO).get();
}
catch (IgniteCheckedException ex) {
+ if (cctx.kernalContext().isStopping()) {
+ isCancelled.set(true);
+
+ return; // Node is stopped before affinity has prepared.
+ }
+
throw new IgniteException("Failed to wait for initialization topology [err="
+ ex.getMessage() + ']', ex);
}
@@ -214,13 +220,13 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
}
catch (Throwable t) {
if (X.hasCause(t, NodeStoppingException.class)) {
- isCancelled = true; // Treat node stopping as valid worker cancellation.
+ isCancelled.set(true); // Treat node stopping as valid worker cancellation.
return;
}
if (!(t instanceof IgniteInterruptedCheckedException || t instanceof InterruptedException)) {
- if (isCancelled)
+ if (isCancelled.get())
return;
err = t;
@@ -229,7 +235,7 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
throw t;
}
finally {
- if (err == null && !isCancelled)
+ if (err == null && !isCancelled.get())
err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
index c412a5dca2b..b994c4b7d06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
@@ -458,7 +458,7 @@ class BinaryMetadataFileStore {
body0();
}
catch (InterruptedException e) {
- if (!isCancelled) {
+ if (!isCancelled.get()) {
ctx.failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, e));
throw e;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index 051d51779bd..457c668964f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -294,7 +294,7 @@ public class Checkpointer extends GridWorker {
throw t;
}
finally {
- if (err == null && !(isCancelled))
+ if (err == null && !(isCancelled.get()))
err = new IllegalStateException("Thread is terminated unexpectedly: " + name());
if (err instanceof OutOfMemoryError)
@@ -826,7 +826,7 @@ public class Checkpointer extends GridWorker {
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
- isCancelled = true;
+ isCancelled.set(true);
}
}
@@ -887,7 +887,7 @@ public class Checkpointer extends GridWorker {
log.debug("Cancelling grid runnable: " + this);
// Do not interrupt runner thread.
- isCancelled = true;
+ isCancelled.set(true);
synchronized (this) {
notifyAll();
@@ -915,7 +915,7 @@ public class Checkpointer extends GridWorker {
public void shutdownNow() {
shutdownNow = true;
- if (!isCancelled)
+ if (!isCancelled.get())
cancel();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index f64b11ac8ea..6711b411a7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1890,7 +1890,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
private void shutdown() throws IgniteInterruptedCheckedException {
synchronized (this) {
- isCancelled = true;
+ isCancelled.set(true);
notifyAll();
}
@@ -1985,7 +1985,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
Thread.currentThread().interrupt();
synchronized (this) {
- isCancelled = true;
+ isCancelled.set(true);
}
}
catch (Throwable t) {
@@ -2141,7 +2141,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
public void restart() {
assert runner() == null : "FileArchiver is still running";
- isCancelled = false;
+ isCancelled.set(false);
new IgniteThread(archiver).start();
}
@@ -2249,7 +2249,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
void restart() {
assert runner() == null : "FileCompressorWorker is still running.";
- isCancelled = false;
+ isCancelled.set(false);
new IgniteThread(this).start();
}
@@ -2530,7 +2530,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " +
"[tmp=" + unzipTmp + ", raw=" + unzip + "]", e);
}
- else if (!isCancelled) {
+ else if (!isCancelled.get()) {
ex = new IgniteCheckedException("Error during WAL segment decompression [segmentIdx=" +
segmentToDecompress + "]", e);
}
@@ -2549,14 +2549,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
catch (InterruptedException e) {
Thread.currentThread().interrupt();
- if (!isCancelled)
+ if (!isCancelled.get())
err = e;
}
catch (Throwable t) {
err = t;
}
finally {
- if (err == null && !isCancelled)
+ if (err == null && !isCancelled.get())
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
@@ -2605,7 +2605,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
void restart() {
assert runner() == null : "FileDecompressor is still running.";
- isCancelled = false;
+ isCancelled.set(false);
new IgniteThread(this).start();
}
@@ -3292,7 +3292,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
catch (IgniteInterruptedCheckedException e) {
Thread.currentThread().interrupt();
- isCancelled = true;
+ isCancelled.set(true);
}
catch (Throwable t) {
err = t;
@@ -3314,7 +3314,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws IgniteInterruptedCheckedException If failed to wait for worker shutdown.
*/
private void shutdown() throws IgniteInterruptedCheckedException {
- isCancelled = true;
+ isCancelled.set(true);
U.join(this);
}
@@ -3325,7 +3325,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
public void restart() {
assert runner() == null : "FileCleaner is still running";
- isCancelled = false;
+ isCancelled.set(false);
new IgniteThread(this).start();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
index a836be2fc5c..b901118a34f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -403,7 +403,7 @@ public class FileHandleManagerImpl implements FileHandleManager {
unparkWaiters(MAX_VALUE);
- if (err == null && !isCancelled)
+ if (err == null && !isCancelled.get())
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
@@ -583,7 +583,7 @@ public class FileHandleManagerImpl implements FileHandleManager {
public void restart() {
assert runner() == null : "WALWriter is still running.";
- isCancelled = false;
+ isCancelled.set(false);
new IgniteThread(this).start();
}
@@ -640,7 +640,7 @@ public class FileHandleManagerImpl implements FileHandleManager {
public void restart() {
assert runner() == null : "WalSegmentSyncer is running.";
- isCancelled = false;
+ isCancelled.set(false);
new IgniteThread(walSegmentSyncWorker).start();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
index 5316f627c99..5bae8fb052f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageL
import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.ACTUALIZE;
import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.CLUSTER_WIDE_UPDATE;
@@ -124,7 +125,7 @@ public class DistributedConfigurationProcessor extends GridProcessorAdapter impl
* @param propKey Key of specific property.
* @return Property key for meta storage.
*/
- private static String toMetaStorageKey(String propKey) {
+ public static String toMetaStorageKey(String propKey) {
return DIST_CONF_PREFIX + propKey;
}
@@ -169,7 +170,7 @@ public class DistributedConfigurationProcessor extends GridProcessorAdapter impl
* @param name Property name.
* @return Public property.
*/
- public DistributedChangeableProperty<Serializable> property(String name) {
+ public @Nullable DistributedChangeableProperty<Serializable> property(String name) {
DistributedChangeableProperty<?> p = props.get(name);
if (!(p instanceof DistributedChangeableProperty))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 122b2ea66e7..71e62477676 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
@@ -87,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
@@ -113,9 +115,11 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty.detachedLongProperty;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.CPU_LOAD;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
@@ -166,6 +170,12 @@ public class GridJobProcessor extends GridProcessorAdapter {
/** Total jobs waiting time metric name. */
public static final String WAITING_TIME = "WaitingTime";
+ /**
+ * Distributed property that defines the timeout for interrupting the
+ * {@link GridJobWorker worker} after {@link GridJobWorker#cancel() cancellation} in mills.
+ */
+ public static final String COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT = "computeJobWorkerInterruptTimeout";
+
/** */
private final Marshaller marsh;
@@ -312,6 +322,10 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
@Nullable private final String jobPriAttrKey;
+ /** Timeout interrupt {@link GridJobWorker workers} after {@link GridJobWorker#cancel cancel} im mills. */
+ private final DistributedLongProperty computeJobWorkerInterruptTimeout =
+ detachedLongProperty(COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT);
+
/**
* @param ctx Kernal context.
*/
@@ -378,6 +392,15 @@ public class GridJobProcessor extends GridProcessorAdapter {
taskPriAttrKey = null;
jobPriAttrKey = null;
}
+
+ ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> {
+ computeJobWorkerInterruptTimeout.addListener(makeUpdateListener(
+ "Compute job parameter '%s' was changed from '%s' to '%s'",
+ log
+ ));
+
+ dispatcher.registerProperty(computeJobWorkerInterruptTimeout);
+ });
}
/** {@inheritDoc} */
@@ -878,7 +901,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
* In most cases this method should be called from main read lock
* to avoid jobs activation after node stop has started.
*/
- private void handleCollisions() {
+ public void handleCollisions() {
assert !jobAlwaysActivate;
if (handlingCollision.get()) {
@@ -1327,7 +1350,9 @@ public class GridJobProcessor extends GridProcessorAdapter {
holdLsnr,
partsReservation,
req.getTopVer(),
- req.executorName());
+ req.executorName(),
+ this::computeJobWorkerInterruptTimeout
+ );
jobCtx.job(job);
@@ -2414,4 +2439,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
else
return w -> sesId.equals(w.getSession().getId()) && jobId.equals(w.getJobId());
}
+
+ /**
+ * @return Interruption timeout of {@link GridJobWorker workers} (in millis) after {@link GridWorker#cancel cancel} is called.
+ */
+ public long computeJobWorkerInterruptTimeout() {
+ return computeJobWorkerInterruptTimeout.getOrDefault(ctx.config().getFailureDetectionTimeout());
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 0495c6ef8ec..e7b0caea536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -92,11 +93,7 @@ import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.SUS
*/
public class GridJobWorker extends GridWorker implements GridTimeoutObject {
/** Per-thread held flag. */
- private static final ThreadLocal<Boolean> HOLD = new ThreadLocal<Boolean>() {
- @Override protected Boolean initialValue() {
- return false;
- }
- };
+ private static final ThreadLocal<Boolean> HOLD = ThreadLocal.withInitial(() -> false);
/** Static logger to avoid re-creation. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -189,6 +186,13 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
private volatile ComputeJobStatusEnum status = QUEUED;
/**
+ * Supplier of timeout interrupt {@link GridJobWorker workers} after {@link GridJobWorker#cancel cancel} im mills.
+ */
+ private final LongSupplier jobInterruptTimeoutSupplier;
+
+ /**
+ * Constructor.
+ *
* @param ctx Kernal context.
* @param dep Grid deployment.
* @param createTime Create time.
@@ -203,6 +207,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
* @param partsReservation Reserved partitions (must be released at the job finish).
* @param reqTopVer Affinity topology version of the job request.
* @param execName Custom executor name.
+ * @param jobInterruptTimeoutSupplier Supplier of timeout interrupt
+ * {@link GridJobWorker workers} after {@link GridJobWorker#cancel cancel} im mills.
*/
GridJobWorker(
GridKernalContext ctx,
@@ -218,10 +224,11 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
GridJobHoldListener holdLsnr,
GridReservable partsReservation,
AffinityTopologyVersion reqTopVer,
- String execName) {
+ String execName,
+ LongSupplier jobInterruptTimeoutSupplier
+ ) {
super(ctx.igniteInstanceName(), "grid-job-worker", ctx.log(GridJobWorker.class));
- assert ctx != null;
assert ses != null;
assert jobCtx != null;
assert taskNode != null;
@@ -242,6 +249,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
this.partsReservation = partsReservation;
this.reqTopVer = reqTopVer;
this.execName = execName;
+ this.jobInterruptTimeoutSupplier = jobInterruptTimeoutSupplier;
if (job != null)
this.job = job;
@@ -1160,6 +1168,64 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
return jobId.hashCode();
}
+ /** {@inheritDoc} */
+ @Override protected void onCancel(boolean firstCancelRequest) {
+ if (firstCancelRequest)
+ handleCancel();
+ else {
+ if (log.isDebugEnabled()) {
+ Thread runner = runner();
+
+ log.debug(String.format(
+ "Worker cancellation is ignored [jobId=%s, interrupted=%s]",
+ getJobId(),
+ runner == null ? "unknown" : runner.isInterrupted()
+ ));
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onCancelledBeforeWorkerScheduled() {
+ // To ensure that the worker does not hang if the cancellation was before the start of its execution.
+ handleCancel();
+ }
+
+ /**
+ * Handles the cancellation of the worker.
+ */
+ private void handleCancel() {
+ long timeout = jobInterruptTimeoutSupplier.getAsLong();
+
+ if (timeout > 0) {
+ ctx.timeout().addTimeoutObject(
+ new JobWorkerInterruptionTimeoutObject(this, U.currentTimeMillis() + timeout)
+ );
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Worker will be interrupted later [jobId=%s, timeout=%s]",
+ getJobId(),
+ U.humanReadableDuration(timeout)
+ ));
+ }
+ }
+ else {
+ Thread runner = runner();
+
+ if (runner != null)
+ runner.interrupt();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Worker is interrupted on cancel [jobId=%s, interrupted=%s]",
+ getJobId(),
+ runner == null ? "unknown" : runner.isInterrupted()
+ ));
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridJobWorker.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/JobWorkerInterruptionTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/JobWorkerInterruptionTimeoutObject.java
new file mode 100644
index 00000000000..3e5b37bf69e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/JobWorkerInterruptionTimeoutObject.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.job;
+
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Timeout object for delayed {@link GridJobWorker worker} interruption.
+ *
+ * <p>After calling {@link GridJobWorker#cancel} the worker should try to complete gracefully,
+ * if it doesn't then it will {@link Thread#interrupt interrupt} after some time.
+ */
+public class JobWorkerInterruptionTimeoutObject implements GridTimeoutObject {
+ /** Compute job worker. */
+ private final GridJobWorker jobWorker;
+
+ /** ID of timeout object. */
+ private final IgniteUuid id;
+
+ /** Time when the timeout object should be executed in mills. */
+ private final long endTime;
+
+ /**
+ * Constructor.
+ *
+ * @param jobWorker Compute job worker.
+ * @param endTime Time when the timeout object should be executed in mills.
+ */
+ public JobWorkerInterruptionTimeoutObject(
+ GridJobWorker jobWorker,
+ long endTime
+ ) {
+ this.jobWorker = jobWorker;
+ this.endTime = endTime;
+
+ id = IgniteUuid.randomUuid();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ assert jobWorker.isCancelled() : jobWorker;
+
+ Thread runner = jobWorker.runner();
+
+ if (runner != null && !jobWorker.isDone())
+ runner.interrupt();
+ }
+
+ /**
+ * @return Compute job worker.
+ */
+ public GridJobWorker jobWorker() {
+ return jobWorker;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(JobWorkerInterruptionTimeoutObject.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
index ebc5c73bc5b..bb973857e8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
@@ -100,7 +100,7 @@ public class DmsDataWriterWorker extends GridWorker {
/** Start new distributed metastorage worker thread. */
public void start() {
- isCancelled = false;
+ isCancelled.set(false);
new IgniteThread(igniteInstanceName(), "dms-writer-thread", this).start();
}
@@ -187,7 +187,7 @@ public class DmsDataWriterWorker extends GridWorker {
updateQueue.offer(new FutureTask<>(() -> STOP));
latch.countDown();
- isCancelled = true;
+ isCancelled.set(true);
Thread runner = runner();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java
index 692bad5aa1d..7f7b730126a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java
@@ -26,15 +26,15 @@ public interface GridTimeoutObject {
/**
* @return ID of the object.
*/
- public IgniteUuid timeoutId();
+ IgniteUuid timeoutId();
/**
* @return End time.
*/
- public long endTime();
+ long endTime();
/**
* Timeout callback.
*/
- public void onTimeout();
+ void onTimeout();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 7efcea9c8a8..f0b3d3b94ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -296,7 +296,7 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
throw t;
}
finally {
- if (err == null && !isCancelled)
+ if (err == null && !isCancelled.get())
err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly.");
if (err instanceof OutOfMemoryError)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 50d9b9ef399..bff78e8a3d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -658,7 +658,7 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
}
}
- if (!isCancelled) {
+ if (!isCancelled.get()) {
errHnd.apply(new IllegalStateException("Thread " + Thread.currentThread().getName() +
" is terminated unexpectedly"));
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 50ff443a9d3..2f3e8e55295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2276,7 +2276,7 @@ public class GridNioServer<T> {
}
// select() call above doesn't throw on interruption; checking it here to propagate timely.
- if (!closed && !isCancelled && Thread.interrupted())
+ if (!closed && !isCancelled.get() && Thread.interrupted())
throw new InterruptedException();
}
finally {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
index 615d5062a94..85959950f33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.worker;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
@@ -49,7 +50,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
private volatile boolean finished;
/** Whether or not this runnable is cancelled. */
- protected volatile boolean isCancelled;
+ protected final AtomicBoolean isCancelled = new AtomicBoolean();
/** Actual thread runner. */
private volatile Thread runner;
@@ -114,9 +115,8 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
log.debug("Grid runnable started: " + name);
try {
- // Special case, when task gets cancelled before it got scheduled.
- if (isCancelled)
- runner.interrupt();
+ if (isCancelled.get())
+ onCancelledBeforeWorkerScheduled();
// Listener callback.
if (lsnr != null)
@@ -160,7 +160,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
lsnr.onStopped(this);
if (log.isDebugEnabled())
- if (isCancelled)
+ if (isCancelled.get())
log.debug("Grid runnable finished due to cancellation: " + name);
else if (runner.isInterrupted())
log.debug("Grid runnable finished due to interruption without cancellation: " + name);
@@ -191,9 +191,9 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
}
/**
- * @return Runner thread.
+ * @return Runner thread, {@code null} if the worker has not yet started executing.
*/
- public Thread runner() {
+ public @Nullable Thread runner() {
return runner;
}
@@ -216,20 +216,13 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
}
/**
- * Cancels this runnable interrupting actual runner.
+ * Cancels this runnable.
*/
public void cancel() {
if (log.isDebugEnabled())
log.debug("Cancelling grid runnable: " + this);
- isCancelled = true;
-
- Thread runner = this.runner;
-
- // Cannot apply Future.cancel() because if we do, then Future.get() would always
- // throw CancellationException and we would not be able to wait for task completion.
- if (runner != null)
- runner.interrupt();
+ onCancel(isCancelled.compareAndSet(false, true));
}
/**
@@ -241,7 +234,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
if (log.isDebugEnabled())
log.debug("Joining grid runnable: " + this);
- if ((runner == null && isCancelled) || finished)
+ if ((runner == null && isCancelled.get()) || finished)
return;
synchronized (mux) {
@@ -259,7 +252,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
public boolean isCancelled() {
Thread runner = this.runner;
- return isCancelled || (runner != null && runner.isInterrupted());
+ return isCancelled.get() || (runner != null && runner.isInterrupted());
}
/**
@@ -306,6 +299,31 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
lsnr.onIdle(this);
}
+ /**
+ * Callback on runner cancellation.
+ *
+ * @param firstCancelRequest Flag indicating that worker cancellation was requested for the first time.
+ */
+ protected void onCancel(boolean firstCancelRequest) {
+ Thread runner = this.runner;
+
+ // Cannot apply Future.cancel() because if we do, then Future.get() would always
+ // throw CancellationException, and we would not be able to wait for task completion.
+ if (runner != null)
+ runner.interrupt();
+ }
+
+ /**
+ * Callback on special case, when task is cancelled before is has been scheduled.
+ */
+ protected void onCancelledBeforeWorkerScheduled() {
+ Thread runner = this.runner;
+
+ assert runner != null : this;
+
+ runner.interrupt();
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
Thread runner = this.runner;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java
index b921106d13c..a11ecd762ee 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java
@@ -60,6 +60,9 @@ public class GridCancelOnGridStopSelfTest extends GridCommonAbstractTest {
cancelCall = false;
try (Ignite g = startGrid(1)) {
+ // We change it because compute jobs will go to sleep.
+ assertTrue(computeJobWorkerInterruptTimeout(g).propagate(10L));
+
cnt = new CountDownLatch(1);
g.compute().executeAsync(CancelledTask.class, null);
@@ -67,7 +70,7 @@ public class GridCancelOnGridStopSelfTest extends GridCommonAbstractTest {
cnt.await();
}
- assert cancelCall;
+ assertTrue(cancelCall);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java
index 1c44ad125b6..564793b93ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java
@@ -43,6 +43,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
/**
* Cancel unused job test.
*/
@@ -93,31 +96,32 @@ public class GridCancelUnusedJobSelfTest extends GridCommonAbstractTest {
public void testCancel() throws Exception {
Ignite ignite = G.ignite(getTestIgniteInstanceName());
+ // We change it because compute jobs will go to sleep.
+ assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
+
ignite.compute().localDeployTask(GridCancelTestTask.class, U.detectClassLoader(GridCancelTestTask.class));
ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridCancelTestTask.class.getName(), null);
- // Wait until jobs begin execution.
- boolean await = startSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+ assertNotNull(fut);
- assert await : "Jobs did not start.";
+ // Wait until jobs begin execution.
+ assertTrue("Jobs did not start.", startSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS));
info("Test task result: " + fut);
- assert fut != null;
-
// Only first job should successfully complete.
- Object res = fut.get();
- assert (Integer)res == 1;
+ assertThat(fut.get(getTestTimeout()), equalTo(1));
// Wait for all jobs to finish.
- await = stopSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS);
- assert await : "Jobs did not stop.";
+ assertTrue("Jobs did not stop.", stopSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS));
// One is definitely processed. But there might be some more processed or cancelled or processed and cancelled.
// Thus total number should be at least SPLIT_COUNT and at most (SPLIT_COUNT - 1) *2 +1
- assert (cancelCnt + processedCnt) >= SPLIT_COUNT && (cancelCnt + processedCnt) <= (SPLIT_COUNT - 1) * 2 + 1 :
- "Invalid cancel count value: " + cancelCnt;
+ assertTrue(
+ "Invalid cancel count value: " + cancelCnt,
+ (cancelCnt + processedCnt) >= SPLIT_COUNT && (cancelCnt + processedCnt) <= (SPLIT_COUNT - 1) * 2 + 1
+ );
}
/**
@@ -171,6 +175,8 @@ public class GridCancelUnusedJobSelfTest extends GridCommonAbstractTest {
private ComputeTaskSession ses;
/**
+ * Constructor.
+ *
* @param arg Argument.
*/
private GridCancelTestJob(Integer arg) {
@@ -179,7 +185,7 @@ public class GridCancelUnusedJobSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public Serializable execute() {
- int arg = this.<Integer>argument(0);
+ int arg = argument(0);
try {
if (log.isInfoEnabled())
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java
index f5c7fc9175f..0547a01bc71 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java
@@ -31,7 +31,6 @@ import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.resources.LoggerResource;
@@ -44,11 +43,14 @@ import org.apache.ignite.spi.collision.CollisionJobContext;
import org.apache.ignite.spi.collision.CollisionSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
/**
* Cancelled jobs metrics self test.
*/
@@ -85,25 +87,24 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
public void testCancelledJobs() throws Exception {
final Ignite ignite = G.ignite(getTestIgniteInstanceName());
+ // We change it because compute jobs will go to sleep.
+ assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
+
Collection<ComputeTaskFuture<?>> futs = new ArrayList<>();
for (int i = 1; i <= 10; i++)
futs.add(ignite.compute().executeAsync(CancelledTask.class, null));
// Wait to be sure that metrics were updated.
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return ignite.cluster().localNode().metrics().getTotalCancelledJobs() > 0;
- }
- }, 5000);
+ waitForCondition(() -> ignite.cluster().localNode().metrics().getTotalCancelledJobs() > 0, 5000);
colSpi.externalCollision();
for (ComputeTaskFuture<?> fut : futs) {
try {
- fut.get();
+ fut.get(getTestTimeout());
- assert false : "Job was not interrupted.";
+ fail("Job was not interrupted.");
}
catch (IgniteException e) {
if (e.hasCause(InterruptedException.class))
@@ -114,9 +115,7 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
}
// Job was cancelled and now we need to calculate metrics.
- int totalCancelledJobs = ignite.cluster().localNode().metrics().getTotalCancelledJobs();
-
- assert totalCancelledJobs == 10 : "Metrics were not updated. Expected 10 got " + totalCancelledJobs;
+ assertThat(ignite.cluster().localNode().metrics().getTotalCancelledJobs(), equalTo(10));
}
/**
@@ -130,7 +129,7 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public Object reduce(List<ComputeJobResult> results) {
- assert results.get(0).isCancelled() : "Wrong job result status.";
+ assertFalse(results.get(0).isCancelled());
return null;
}
@@ -148,13 +147,6 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
Thread.sleep(Long.MAX_VALUE);
}
catch (InterruptedException ignored) {
- try {
- Thread.sleep(1000);
- }
- catch (InterruptedException e1) {
- throw new IgniteException("Unexpected exception: ", e1);
- }
-
throw new IgniteException("Job got interrupted while waiting for cancellation.");
}
finally {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java
index 73e53bf15af..fc8371bf081 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java
@@ -43,6 +43,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
/**
* Test failover and custom topology. Topology returns local node if remote node fails.
*/
@@ -87,11 +90,10 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
@Test
public void testFailoverTopology() throws Exception {
try {
- Ignite ignite1 = startGrid(1);
- Ignite ignite2 = startGrid(2);
+ Ignite ignite1 = startGrids(2);
- assert ignite1 != null;
- assert ignite2 != null;
+ // We change it because compute jobs will go to sleep.
+ assertTrue(computeJobWorkerInterruptTimeout(ignite1).propagate(10L));
ignite1.compute().localDeployTask(JobTask.class, JobTask.class.getClassLoader());
@@ -104,9 +106,9 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
mux.wait();
}
- stopAndCancelGrid(2);
+ stopAndCancelGrid(1);
- String res = fut.get();
+ String res = fut.get(getTestTimeout());
info("Task result: " + res);
}
@@ -116,13 +118,10 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
info("Failed over: " + failCnt.get());
- assert failCnt.get() == 1 : "Invalid fail over counter [expected=1, actual=" + failCnt.get() + ']';
+ assertThat(failCnt.get(), equalTo(1));
}
finally {
- stopGrid(1);
-
- // Stopping stopped instance just in case.
- stopGrid(2);
+ stopAllGrids();
}
}
@@ -139,11 +138,11 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) {
- assert ignite != null;
+ assertNotNull(ignite);
UUID locNodeId = ignite.configuration().getNodeId();
- assert locNodeId != null;
+ assertNotNull(locNodeId);
if (log.isInfoEnabled())
log.info("Mapping jobs [subgrid=" + subgrid + ", arg=" + arg + ']');
@@ -167,7 +166,7 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
UUID nodeId = ignite.configuration().getNodeId();
- assert nodeId != null;
+ assertNotNull(nodeId);
if (!nodeId.equals(argument(0))) {
try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
index be451625462..2ede9cbb4fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
@@ -73,6 +73,9 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
stopAllGrids();
ignite = startGridsMultiThreaded(2);
+
+ // We are changing it because compute jobs fall asleep.
+ assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java
index 522ed7786a8..9fc28dbca87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java
@@ -71,6 +71,9 @@ public class GridStopWithCancelSelfTest extends GridCommonAbstractTest {
try {
Ignite ignite = startGrid("testGrid");
+ // We are changing it because compute jobs fall asleep.
+ assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
+
executeAsync(ignite.compute(), CancelledTask.class, null);
cnt.await();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java
index 5f9d9bd5669..606b5d26ff5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java
@@ -40,6 +40,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.startsWith;
+
/**
* Test for task future when grid stops.
*/
@@ -72,17 +75,17 @@ public class GridTaskFutureImplStopGridSelfTest extends GridCommonAbstractTest {
public void testGet() throws Exception {
Ignite ignite = startGrid(getTestIgniteInstanceName());
+ // We change it because compute jobs fall asleep.
+ assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
+
Thread futThread = null;
try {
final ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridStopTestTask.class.getName(), null);
- fut.listen(new CI1<IgniteFuture>() {
- @SuppressWarnings({"NakedNotify"})
- @Override public void apply(IgniteFuture gridFut) {
- synchronized (mux) {
- mux.notifyAll();
- }
+ fut.listen((CI1<IgniteFuture>)gridFut -> {
+ synchronized (mux) {
+ mux.notifyAll();
}
});
@@ -104,7 +107,7 @@ public class GridTaskFutureImplStopGridSelfTest extends GridCommonAbstractTest {
failed.set(true);
// Make sure that message contains info about stopping grid.
- assert e.getMessage().startsWith("Task failed due to stopping of the grid:");
+ assertThat(e.getMessage(), startsWith("Task failed due to stopping of the grid:"));
}
finally {
latch.countDown();
@@ -135,7 +138,7 @@ public class GridTaskFutureImplStopGridSelfTest extends GridCommonAbstractTest {
info("Test task result [failed=" + failed.get() + ", taskFuture=" + fut + ']');
- assert finished : "Future thread was not stopped.";
+ assertTrue(finished);
assert fut.isDone();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java
index f2475eab96a..6bd6daf901f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java
@@ -97,6 +97,9 @@ public class SystemViewComputeJobTest extends GridCommonAbstractTest {
cache = server.createCache("test-cache");
cache.put(1, 1);
+
+ // We are changing it because compute jobs fall asleep.
+ assertTrue(computeJobWorkerInterruptTimeout(server).propagate(10L));
}
/** Tests work of {@link SystemView} for compute grid {@link IgniteCompute#broadcastAsync(IgniteRunnable)} call. */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
index 17c783dc98a..e6ece4a4b3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
@@ -91,6 +91,9 @@ public class ComputeJobStatusTest extends GridCommonAbstractTest {
node0 = crd;
node1 = grid(1);
+
+ // We are changing it because compute jobs fall asleep.
+ assertTrue(computeJobWorkerInterruptTimeout(node0).propagate(10L));
}
/** {@inheritDoc} */
@@ -258,6 +261,9 @@ public class ComputeJobStatusTest extends GridCommonAbstractTest {
U.sleep(100);
checkTaskJobStatuses(sesId, FINISHED, null);
+
+ // Let's wait a bit for the callcc (above) to complete.
+ U.sleep(100);
}
// Let's check that the job (WaitJob) on the node0 has finished
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java
new file mode 100644
index 00000000000..0bd42eae51f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobSibling;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
+import org.apache.ignite.internal.processors.job.GridJobWorker;
+import org.apache.ignite.internal.processors.job.JobWorkerInterruptionTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.toMetaStorageKey;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+/**
+ * {@link GridJobWorker} interrupt testing.
+ */
+public class InterruptComputeJobTest extends GridCommonAbstractTest {
+ /** Node. */
+ private static IgniteEx node;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ stopAllGrids();
+
+ node = startGrid();
+
+ node.cluster().state(ACTIVE);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+
+ node = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = true;
+
+ // Reset distributed property.
+ node.context().distributedMetastorage().remove(
+ toMetaStorageKey(computeJobWorkerInterruptTimeout(node).getName())
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ // Cleanup with task release.
+ CountDownLatchJob.JOBS.removeIf(job -> {
+ job.latch.countDown();
+
+ return true;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName).setCollisionSpi(new PriorityQueueCollisionSpiEx());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return new StopNodeFailureHandler();
+ }
+
+ /**
+ * Checks that method {@link GridJobProcessor#computeJobWorkerInterruptTimeout()}
+ * returns a valid value that depends on distributed property "computeJobWorkerInterruptTimeout".
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testComputeJobWorkerInterruptTimeoutProperty() throws Exception {
+ // Check default.
+ assertThat(
+ node.context().job().computeJobWorkerInterruptTimeout(),
+ equalTo(node.context().config().getFailureDetectionTimeout())
+ );
+
+ // Check update value.
+ computeJobWorkerInterruptTimeout(node).propagate(100500L);
+
+ assertThat(node.context().job().computeJobWorkerInterruptTimeout(), equalTo(100500L));
+ }
+
+ /**
+ * Checks that when {@link GridJobWorker#cancel()} (even twice) is called, the {@link GridJobWorker#runner()}
+ * is not interrupted and that only one {@link JobWorkerInterruptionTimeoutObject} is created.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCancel() throws Exception {
+ computeJobWorkerInterruptTimeout(node).propagate(TimeUnit.HOURS.toMillis(1));
+
+ ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
+
+ GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
+
+ cancelWitchChecks(jobWorker);
+
+ cancelWitchChecks(jobWorker);
+
+ ((CountDownLatchJob)jobWorker.getJob()).latch.countDown();
+
+ taskFut.get(getTestTimeout());
+ }
+
+ /**
+ * Checks that after {@link GridJobWorker#cancel()}, the {@link JobWorkerInterruptionTimeoutObject}
+ * will trigger the {@link Thread#interrupt()}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testInterrupt() throws Exception {
+ computeJobWorkerInterruptTimeout(node).propagate(100L);
+
+ ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
+
+ GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
+
+ cancelWitchChecks(jobWorker);
+
+ // We are waiting for the GridJobWorkerInterrupter to interrupt the worker.
+ taskFut.get(1_000L);
+
+ assertThat(jobWorker.isCancelled(), equalTo(true));
+ assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(true));
+ assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), empty());
+ }
+
+ /**
+ * Checks that if the worker was {@link GridJobWorker#cancel()} (even twice) before starting work,
+ * then it will be canceled, not interrupted, and have one {@link JobWorkerInterruptionTimeoutObject} before
+ * and two after the start.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCancelBeforeStart() throws Exception {
+ PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = false;
+
+ computeJobWorkerInterruptTimeout(node).propagate(TimeUnit.HOURS.toMillis(1));
+
+ ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
+
+ GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
+
+ cancelBeforeStartWitchChecks(jobWorker);
+
+ cancelBeforeStartWitchChecks(jobWorker);
+
+ PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = true;
+
+ node.context().job().handleCollisions();
+
+ assertTrue(waitForCondition(jobWorker::isStarted, getTestTimeout(), 10));
+
+ assertThat(jobWorker.isCancelled(), equalTo(true));
+ assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(false));
+ assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(2));
+ }
+
+ /**
+ * @param n Node.
+ * @param taskSession Task session.
+ * @return Job worker is expected to be the only one and either active or passive.
+ */
+ private static GridJobWorker jobWorker(IgniteEx n, ComputeTaskSession taskSession) {
+ Collection<ComputeJobSibling> siblings = taskSession.getJobSiblings();
+
+ assertThat(siblings, hasSize(1));
+
+ IgniteUuid jobId = F.first(siblings).getJobId();
+
+ GridJobWorker jobWorker = n.context().job().activeJob(jobId);
+
+ if (jobWorker == null) {
+ Map<IgniteUuid, GridJobWorker> passiveJobs = getFieldValue(n.context().job(), "passiveJobs");
+
+ if (passiveJobs != null)
+ jobWorker = passiveJobs.get(jobId);
+ }
+
+ assertThat(jobWorker, notNullValue());
+
+ return jobWorker;
+ }
+
+ /**
+ * Cancels the worker, checking that it is canceled and not interrupted and only one interrupter is added.
+ *
+ * @param jobWorker Compute job worker.
+ * @throws Exception If failed.
+ */
+ private void cancelWitchChecks(GridJobWorker jobWorker) throws Exception {
+ assertTrue(waitForCondition(jobWorker::isStarted, getTestTimeout(), 10));
+
+ jobWorker.cancel();
+
+ assertThat(jobWorker.isCancelled(), equalTo(true));
+ assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(false));
+ assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(1));
+ }
+
+ /**
+ * Cancels the worker before it starts, checks that it is canceled, and creates one interrupter.
+ *
+ * @param jobWorker Compute job worker.
+ */
+ private void cancelBeforeStartWitchChecks(GridJobWorker jobWorker) {
+ jobWorker.cancel();
+
+ assertThat(jobWorker.isStarted(), equalTo(false));
+ assertThat(jobWorker.runner(), nullValue());
+
+ assertThat(jobWorker.isCancelled(), equalTo(true));
+ assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(1));
+ }
+
+ /**
+ * @param n Node.
+ * @return Value of {@code GridTimeoutProcessor#timeoutObjs}.
+ */
+ private static GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjects(IgniteEx n) {
+ return getFieldValue(n.context().timeout(), "timeoutObjs");
+ }
+
+ /**
+ * @param timeoutObjects Value of {@code GridTimeoutProcessor#timeoutObjs}.
+ * @param jobWorker Compute job worker.
+ * @return Collection of {@link JobWorkerInterruptionTimeoutObject} for {@code jobWorker}.
+ */
+ private static Collection<JobWorkerInterruptionTimeoutObject> jobWorkerInterrupters(
+ GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjects,
+ GridJobWorker jobWorker
+ ) {
+ return timeoutObjects.stream()
+ .filter(JobWorkerInterruptionTimeoutObject.class::isInstance)
+ .map(JobWorkerInterruptionTimeoutObject.class::cast)
+ .filter(o -> o.jobWorker() == jobWorker)
+ .collect(toList());
+ }
+
+ /**
+ * @return Value of {@link CountDownLatchJob#interrupted}.
+ */
+ private static boolean countDownLatchJobInterrupted(GridJobWorker jobWorker) {
+ return ((CountDownLatchJob)jobWorker.getJob()).interrupted;
+ }
+
+ /**
+ * Test extension {@link PriorityQueueCollisionSpi}.
+ */
+ private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
+ /** Collision handling flag. */
+ volatile boolean handleCollision = true;
+
+ /** {@inheritDoc} */
+ @Override public void onCollision(CollisionContext ctx) {
+ if (handleCollision)
+ super.onCollision(ctx);
+ }
+
+ /**
+ * @param n Node.
+ * @return Test extension {@link PriorityQueueCollisionSpi}.
+ */
+ static PriorityQueueCollisionSpiEx collisionSpiEx(IgniteEx n) {
+ return ((PriorityQueueCollisionSpiEx)n.configuration().getCollisionSpi());
+ }
+ }
+
+ /**
+ * Task that creates jobs.
+ */
+ private static class ComputeTask extends ComputeTaskAdapter<Object, Void> {
+ /** Compute job class. */
+ final Class<? extends ComputeJobAdapter> jobClass;
+
+ /**
+ * Constructor.
+ *
+ * @param jobClass Compute job class.
+ */
+ ComputeTask(Class<? extends ComputeJobAdapter> jobClass) {
+ this.jobClass = jobClass;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(
+ List<ClusterNode> subgrid,
+ @Nullable Object arg
+ ) throws IgniteException {
+ return subgrid.stream().collect(toMap(n -> newComputeJobInstance(arg), identity()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+
+ /**
+ * @param arg Argument.
+ * @return New instance of {@link #jobClass}.
+ */
+ private ComputeJobAdapter newComputeJobInstance(@Nullable Object arg) {
+ try {
+ if (arg == null)
+ return jobClass.newInstance();
+
+ return jobClass.getDeclaredConstructor(arg.getClass()).newInstance(arg);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
+ * A job that is waiting for the latch counter to decrement in order to complete its work.
+ */
+ private static class CountDownLatchJob extends ComputeJobAdapter {
+ /** All jobs. */
+ static final Collection<CountDownLatchJob> JOBS = new ConcurrentLinkedQueue<>();
+
+ /** Latch. */
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ /** Interrupted. */
+ volatile boolean interrupted;
+
+ /**
+ * Constructor.
+ */
+ public CountDownLatchJob() {
+ JOBS.add(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ interrupted = true;
+
+ Thread.currentThread().interrupt();
+ }
+
+ return null;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java
index 45d4951a763..f9cf4fc9a1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java
@@ -47,6 +47,10 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+
/**
* Test of session siblings cancellation from future.
*/
@@ -73,7 +77,7 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
/**
*
*/
- public GridSessionCancelSiblingsFromFutureSelfTest() {
+ public GridSessionCancelSiblingsFromFutureSelfTest() throws Exception {
super(true);
}
@@ -92,6 +96,14 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
return c;
}
+ /** {@inheritDoc} */
+ @Override protected void beforeFirstTest() throws Exception {
+ super.beforeFirstTest();
+
+ // We are changing it because compute jobs fall asleep.
+ assertTrue(computeJobWorkerInterruptTimeout(G.ignite(getTestIgniteInstanceName())).propagate(10L));
+ }
+
/**
* @throws Exception if failed
*/
@@ -114,18 +126,16 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
final AtomicBoolean failed = new AtomicBoolean(false);
- GridTestUtils.runMultiThreaded(new Runnable() {
- @Override public void run() {
- int num = sNum.get();
+ GridTestUtils.runMultiThreaded(() -> {
+ int num = sNum.get();
- try {
- checkTask(num);
- }
- catch (Throwable e) {
- error("Failed to execute task.", e);
+ try {
+ checkTask(num);
+ }
+ catch (Throwable e) {
+ error("Failed to execute task.", e);
- failed.set(true);
- }
+ failed.set(true);
}
}, EXEC_COUNT, "grid-session-test");
@@ -143,13 +153,11 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridTaskSessionTestTask.class, num);
- assert fut != null;
+ assertNotNull(fut);
try {
// Wait until jobs begin execution.
- boolean await = startSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
-
- assert await : "Jobs did not start.";
+ assertTrue("Jobs did not start.", startSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
Collection<ComputeJobSibling> jobSiblings = fut.getTaskSession().getJobSiblings();
@@ -158,23 +166,19 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
jobSibling.cancel();
}
- Object res = fut.get();
+ Object res = fut.get(getTestTimeout());
- assert "interrupt-task-data".equals(res) : "Invalid task result: " + res;
+ assertThat(res, equalTo("interrupt-task-data"));
// Wait for all jobs to finish.
- await = stopSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
-
- assert await : "Jobs did not cancel.";
-
- int cnt = interruptCnt[num].get();
+ assertTrue("Jobs did not cancel.", stopSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
- assert cnt == SPLIT_COUNT : "Invalid interrupt count value: " + cnt;
+ assertThat(interruptCnt[num].get(), equalTo(SPLIT_COUNT));
}
finally {
// We must wait for the jobs to be sure that they have completed
// their execution since they use static variable (shared for the tests).
- fut.get();
+ fut.get(getTestTimeout());
}
}
@@ -213,11 +217,11 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
if (log.isInfoEnabled())
log.info("Splitting jobs [task=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']');
- assert arg != null;
+ assertNotNull(arg);
taskNum = (Integer)arg;
- assert taskNum != -1;
+ assertThat(taskNum, not(equalTo(-1)));
Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT);
@@ -226,7 +230,7 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
private volatile Thread thread;
@Override public Serializable execute() {
- assert taskSes != null;
+ assertNotNull(taskSes);
thread = Thread.currentThread();
@@ -255,7 +259,7 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
}
@Override public void cancel() {
- assert thread != null;
+ assertNotNull(thread);
interruptCnt[taskNum].incrementAndGet();
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java
index 5b24d3a7061..393a1d109b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java
@@ -51,6 +51,10 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+
/**
* Session cancellation tests.
*/
@@ -94,6 +98,14 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
return c;
}
+ /** {@inheritDoc} */
+ @Override protected void beforeFirstTest() throws Exception {
+ super.beforeFirstTest();
+
+ // We are changing it because compute jobs fall asleep.
+ assertTrue(computeJobWorkerInterruptTimeout(G.ignite(getTestIgniteInstanceName())).propagate(10L));
+ }
+
/**
* @throws Exception If failed.
*/
@@ -116,18 +128,16 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
final AtomicBoolean failed = new AtomicBoolean(false);
- GridTestUtils.runMultiThreaded(new Runnable() {
- @Override public void run() {
- int num = sNum.get();
+ GridTestUtils.runMultiThreaded(() -> {
+ int num = sNum.get();
- try {
- checkTask(num);
- }
- catch (Throwable e) {
- error("Failed to execute task.", e);
+ try {
+ checkTask(num);
+ }
+ catch (Throwable e) {
+ error("Failed to execute task.", e);
- failed.set(true);
- }
+ failed.set(true);
}
}, EXEC_COUNT, "grid-session-test");
@@ -145,26 +155,22 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridTaskSessionTestTask.class, num);
- boolean await = startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
+ assertNotNull(fut);
// Wait until jobs begin execution.
- assert await : "Jobs did not start.";
-
- assert fut != null;
+ assertTrue("Jobs did not start.", startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
- Object res = fut.get();
+ Object res = fut.get(getTestTimeout());
- assert "interrupt-task-data".equals(res) : "Invalid task result: " + res;
-
- await = stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
+ assertThat(res, equalTo("interrupt-task-data"));
// Wait for all jobs to finish.
- assert await :
- "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']';
-
- int cnt = interruptCnt[num].get();
+ assertTrue(
+ "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']',
+ stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS)
+ );
- assert cnt == SPLIT_COUNT - 1 : "Invalid interrupt count value: " + cnt;
+ assertThat(interruptCnt[num].get(), equalTo(SPLIT_COUNT - 1));
}
/** */
@@ -203,11 +209,11 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
if (log.isInfoEnabled())
log.info("Splitting job [task=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']');
- assert arg != null;
+ assertNotNull(arg);
taskNum = (Integer)arg;
- assert taskNum != -1;
+ assertThat(taskNum, not(equalTo(-1)));
Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT);
@@ -222,7 +228,7 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
/** {@inheritDoc} */
@Override public Object execute() {
- assert taskSes != null;
+ assertNotNull(taskSes);
thread = Thread.currentThread();
@@ -241,7 +247,7 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
if (log.isInfoEnabled())
log.info("Job one is proceeding [jobId=" + jobId + ']');
- assert jobId != null;
+ assertNotNull(jobId);
Collection<ComputeJobSibling> jobSiblings = taskSes.getJobSiblings();
@@ -270,7 +276,7 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
/** {@inheritDoc} */
@Override public void cancel() {
- assert thread != null;
+ assertNotNull(thread);
interruptCnt[taskNum].incrementAndGet();
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java
index abb9dac732e..6f41ea29be8 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java
@@ -49,6 +49,10 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+
/**
* Session cancellation tests.
*/
@@ -92,6 +96,14 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
return c;
}
+ /** {@inheritDoc} */
+ @Override protected void beforeFirstTest() throws Exception {
+ super.beforeFirstTest();
+
+ // We are changing it because compute jobs fall asleep.
+ assertTrue(computeJobWorkerInterruptTimeout(G.ignite(getTestIgniteInstanceName())).propagate(10L));
+ }
+
/**
* @throws Exception If failed.
*/
@@ -114,18 +126,16 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
final AtomicBoolean failed = new AtomicBoolean(false);
- GridTestUtils.runMultiThreaded(new Runnable() {
- @Override public void run() {
- int num = sNum.get();
+ GridTestUtils.runMultiThreaded(() -> {
+ int num = sNum.get();
- try {
- checkTask(num);
- }
- catch (Throwable e) {
- error("Failed to execute task.", e);
+ try {
+ checkTask(num);
+ }
+ catch (Throwable e) {
+ error("Failed to execute task.", e);
- failed.set(true);
- }
+ failed.set(true);
}
}, EXEC_COUNT, "grid-session-test");
@@ -143,32 +153,28 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridTaskSessionTestTask.class, num);
- assert fut != null;
+ assertNotNull(fut);
try {
// Wait until jobs begin execution.
- boolean await = startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
-
- assert await : "Jobs did not start.";
+ assertTrue("Jobs did not start.", startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
- Object res = fut.get();
+ Object res = fut.get(getTestTimeout());
- assert "interrupt-task-data".equals(res) : "Invalid task result: " + res;
+ assertThat(res, equalTo("interrupt-task-data"));
// Wait for all jobs to finish.
- await = stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
-
- assert await :
- "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']';
-
- int cnt = interruptCnt[num].get();
+ assertTrue(
+ "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']',
+ stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS)
+ );
- assert cnt == SPLIT_COUNT - 1 : "Invalid interrupt count value: " + cnt;
+ assertThat(interruptCnt[num].get(), equalTo(SPLIT_COUNT - 1));
}
finally {
// We must wait for the jobs to be sure that they have completed
// their execution since they use static variable (shared for the tests).
- fut.get();
+ fut.get(getTestTimeout());
}
}
@@ -208,11 +214,11 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
if (log.isInfoEnabled())
log.info("Splitting job [job=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']');
- assert arg != null;
+ assertNotNull(arg);
taskNum = (Integer)arg;
- assert taskNum != -1;
+ assertThat(taskNum, not(equalTo(-1)));
Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT);
@@ -223,7 +229,7 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
/** {@inheritDoc} */
@Override public Serializable execute() {
- assert taskSes != null;
+ assertNotNull(taskSes);
thread = Thread.currentThread();
@@ -260,7 +266,7 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
/** {@inheritDoc} */
@Override public void cancel() {
- assert thread != null;
+ assertNotNull(thread);
interruptCnt[taskNum].incrementAndGet();
@@ -279,7 +285,7 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
IgniteUuid jobId = received.get(0).getJobContext().getJobId();
- assert jobId != null;
+ assertNotNull(jobId);
// Cancel all jobs except first job with argument 1.
for (ComputeJobSibling jobSibling : jobSiblings) {
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index eff81839ea8..19c11f3796e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1967,25 +1967,62 @@ public final class GridTestUtils {
* @return {@code true} if condition was achieved, {@code false} otherwise.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
*/
- public static boolean waitForCondition(GridAbsPredicate cond, long timeout) throws IgniteInterruptedCheckedException {
+ public static boolean waitForCondition(
+ GridAbsPredicate cond,
+ long timeout
+ ) throws IgniteInterruptedCheckedException {
+ return waitForCondition(cond, timeout, DFLT_BUSYWAIT_SLEEP_INTERVAL);
+ }
+
+ /**
+ * Waits for condition, polling in busy wait loop.
+ *
+ * @param cond Condition to wait for.
+ * @param timeout Max time to wait in milliseconds.
+ * @return {@code true} if condition was achieved, {@code false} otherwise.
+ * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
+ */
+ public static boolean waitForCondition(
+ GridAbsPredicate cond,
+ long timeout,
+ long checkInterval
+ ) throws IgniteInterruptedCheckedException {
long endTime = U.currentTimeMillis() + timeout;
long endTime0 = endTime < 0 ? Long.MAX_VALUE : endTime;
- return waitForCondition(cond, () -> U.currentTimeMillis() < endTime0);
+ return waitForCondition(cond, () -> U.currentTimeMillis() < endTime0, checkInterval);
+ }
+
+ /**
+ * @param cond Condition to wait for.
+ * @param wait Wait predicate.
+ * @return {@code true} if condition was achieved, {@code false} otherwise.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ public static boolean waitForCondition(
+ GridAbsPredicate cond,
+ BooleanSupplier wait
+ ) throws IgniteInterruptedCheckedException {
+ return waitForCondition(cond, wait, DFLT_BUSYWAIT_SLEEP_INTERVAL);
}
/**
* @param cond Condition to wait for.
* @param wait Wait predicate.
* @return {@code true} if condition was achieved, {@code false} otherwise.
+ * @param checkInterval Time interval between two consecutive condition checks.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- public static boolean waitForCondition(GridAbsPredicate cond, BooleanSupplier wait) throws IgniteInterruptedCheckedException {
+ public static boolean waitForCondition(
+ GridAbsPredicate cond,
+ BooleanSupplier wait,
+ long checkInterval
+ ) throws IgniteInterruptedCheckedException {
while (wait.getAsBoolean()) {
if (cond.apply())
return true;
- U.sleep(DFLT_BUSYWAIT_SLEEP_INTERVAL);
+ U.sleep(checkInterval);
}
return false;
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 16004989f20..dfeb9c2c1f9 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -684,7 +684,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
}
/** */
- private void beforeFirstTest() throws Exception {
+ protected void beforeFirstTest() throws Exception {
sharedStaticIpFinder = new TcpDiscoveryVmIpFinder(true);
clsLdr = Thread.currentThread().getContextClassLoader();
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 5a1ab1ddcd8..bbda79b0609 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.testframework.junits.common;
+import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
@@ -114,6 +115,8 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.service.IgniteServiceProcessor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -157,6 +160,8 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
/**
* Super class for all common tests.
@@ -2781,4 +2786,17 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
dbMgr.checkpointReadUnlock();
}
}
+
+ /**
+ * @param n Node.
+ * @return Distributed property: {@code GridJobProcessor#computeJobWorkerInterruptTimeout}.
+ */
+ protected DistributedChangeableProperty<Serializable> computeJobWorkerInterruptTimeout(Ignite n) {
+ DistributedChangeableProperty<Serializable> timeoutProperty = ((IgniteEx)n).context().distributedConfiguration()
+ .property(GridJobProcessor.COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT);
+
+ assertThat(timeoutProperty, notNullValue());
+
+ return timeoutProperty;
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
index c88ab8a708d..4c172b398a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
@@ -77,7 +77,7 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr
@Override public void evaluate() throws Throwable {
GridSpiAbstractTest testClsInstance = (GridSpiAbstractTest)description.getTestClass().newInstance();
try {
- testClsInstance.beforeFirstTest();
+ testClsInstance.beforeFirstTestInternal();
base.evaluate();
}
@@ -157,7 +157,7 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr
}
/** */
- private void beforeFirstTest() throws Exception {
+ protected void beforeFirstTestInternal() throws Exception {
if (autoStart) {
GridSpiTest spiTest = GridTestUtils.getAnnotation(getClass(), GridSpiTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 8ad91a84bfa..f34166a6a09 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -85,6 +85,7 @@ import org.apache.ignite.internal.processors.compute.ComputeJobStatusTest;
import org.apache.ignite.internal.processors.compute.ComputeTaskWithWithoutFullSupportTest;
import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorConfigurationSelfTest;
import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorSelfTest;
+import org.apache.ignite.internal.processors.compute.InterruptComputeJobTest;
import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
import org.apache.ignite.internal.util.StripedExecutorTest;
import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest;
@@ -181,7 +182,8 @@ import org.junit.runners.Suite;
ComputeGridMonitorTest.class,
ComputeJobChangePriorityTest.class,
ComputeJobStatusTest.class,
- ComputeTaskWithWithoutFullSupportTest.class
+ ComputeTaskWithWithoutFullSupportTest.class,
+ InterruptComputeJobTest.class
})
public class IgniteComputeGridTestSuite {
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
index a70c9ec353f..c8f828bb201 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
@@ -107,6 +107,9 @@ public class KillCommandsMXBeanTest extends GridCommonAbstractTest {
srvs.get(0).cluster().state(ACTIVE);
+ // We change to reduce the waiting time for interrupting compute job.
+ computeJobWorkerInterruptTimeout(srvs.get(0)).propagate(100L);
+
IgniteCache<Object, Object> cache = startCli.getOrCreateCache(
new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
index 7dae6b5af94..65163ddaae1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
@@ -95,6 +95,9 @@ public class KillCommandsSQLTest extends GridCommonAbstractTest {
srvs.get(0).cluster().state(ACTIVE);
+ // We change to reduce the waiting time for interrupting compute job.
+ computeJobWorkerInterruptTimeout(srvs.get(0)).propagate(100L);
+
IgniteCache<Object, Object> cache = startCli.getOrCreateCache(
new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));