You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2022/05/05 11:13:56 UTC
[ignite] branch master updated: Revert "IGNITE-16916 Job cancellation routine improvement: configurable graceful stop period before interrupting job worker thread - Fixes #10005."
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d1b236b1928 Revert "IGNITE-16916 Job cancellation routine improvement: configurable graceful stop period before interrupting job worker thread - Fixes #10005."
d1b236b1928 is described below
commit d1b236b1928436bdb1c48a4465eee17fde6b43ce
Author: Sergey Chugunov <se...@gmail.com>
AuthorDate: Thu May 5 14:13:23 2022 +0300
Revert "IGNITE-16916 Job cancellation routine improvement: configurable graceful stop period before interrupting job worker thread - Fixes #10005."
This reverts commit 7357847369079925289114f650a506408812fe4c.
---
.../managers/discovery/GridDiscoveryManager.java | 4 +-
.../cache/GridCacheSharedTtlCleanupManager.java | 12 +-
.../cache/binary/BinaryMetadataFileStore.java | 2 +-
.../cache/persistence/checkpoint/Checkpointer.java | 8 +-
.../persistence/wal/FileWriteAheadLogManager.java | 22 +-
.../wal/filehandle/FileHandleManagerImpl.java | 6 +-
.../DistributedConfigurationProcessor.java | 5 +-
.../internal/processors/job/GridJobProcessor.java | 36 +-
.../internal/processors/job/GridJobWorker.java | 80 +---
.../job/JobWorkerInterruptionTimeoutObject.java | 87 -----
.../persistence/DmsDataWriterWorker.java | 4 +-
.../processors/timeout/GridTimeoutObject.java | 6 +-
.../processors/timeout/GridTimeoutProcessor.java | 2 +-
.../ignite/internal/util/StripedExecutor.java | 2 +-
.../ignite/internal/util/nio/GridNioServer.java | 2 +-
.../ignite/internal/util/worker/GridWorker.java | 54 +--
.../internal/GridCancelOnGridStopSelfTest.java | 5 +-
.../internal/GridCancelUnusedJobSelfTest.java | 30 +-
.../internal/GridCancelledJobsMetricsSelfTest.java | 32 +-
.../GridFailoverCustomTopologySelfTest.java | 27 +-
.../GridMultithreadedJobStealingSelfTest.java | 3 -
.../internal/GridStopWithCancelSelfTest.java | 3 -
.../GridTaskFutureImplStopGridSelfTest.java | 19 +-
.../internal/metric/SystemViewComputeJobTest.java | 3 -
.../processors/compute/ComputeJobStatusTest.java | 6 -
.../compute/InterruptComputeJobTest.java | 416 ---------------------
...ridSessionCancelSiblingsFromFutureSelfTest.java | 60 ++-
.../GridSessionCancelSiblingsFromJobSelfTest.java | 62 ++-
.../GridSessionCancelSiblingsFromTaskSelfTest.java | 64 ++--
.../apache/ignite/testframework/GridTestUtils.java | 45 +--
.../testframework/junits/GridAbstractTest.java | 2 +-
.../junits/common/GridCommonAbstractTest.java | 18 -
.../junits/spi/GridSpiAbstractTest.java | 4 +-
.../testsuites/IgniteComputeGridTestSuite.java | 4 +-
.../apache/ignite/util/KillCommandsMXBeanTest.java | 3 -
.../apache/ignite/util/KillCommandsSQLTest.java | 3 -
36 files changed, 209 insertions(+), 932 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 713d80e662f..81f89479214 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2897,7 +2897,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (!isInterruptedException)
U.error(log, "Exception in discovery notifier worker thread.", t);
- if (!isInterruptedException || !isCancelled.get()) {
+ if (!isInterruptedException || !isCancelled) {
FailureType type = t instanceof OutOfMemoryError ? CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION;
ctx.failure().process(new FailureContext(type, t));
@@ -3077,7 +3077,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
onIdle();
}
catch (InterruptedException e) {
- if (!isCancelled.get())
+ if (!isCancelled)
ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, e));
throw e;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
index dc3aa396e8a..da2b793e242 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
@@ -164,12 +164,6 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
cctx.exchange().affinityReadyFuture(AffinityTopologyVersion.ZERO).get();
}
catch (IgniteCheckedException ex) {
- if (cctx.kernalContext().isStopping()) {
- isCancelled.set(true);
-
- return; // Node is stopped before affinity has prepared.
- }
-
throw new IgniteException("Failed to wait for initialization topology [err="
+ ex.getMessage() + ']', ex);
}
@@ -220,13 +214,13 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
}
catch (Throwable t) {
if (X.hasCause(t, NodeStoppingException.class)) {
- isCancelled.set(true); // Treat node stopping as valid worker cancellation.
+ isCancelled = true; // Treat node stopping as valid worker cancellation.
return;
}
if (!(t instanceof IgniteInterruptedCheckedException || t instanceof InterruptedException)) {
- if (isCancelled.get())
+ if (isCancelled)
return;
err = t;
@@ -235,7 +229,7 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
throw t;
}
finally {
- if (err == null && !isCancelled.get())
+ if (err == null && !isCancelled)
err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
index b994c4b7d06..c412a5dca2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
@@ -458,7 +458,7 @@ class BinaryMetadataFileStore {
body0();
}
catch (InterruptedException e) {
- if (!isCancelled.get()) {
+ if (!isCancelled) {
ctx.failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, e));
throw e;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index 457c668964f..051d51779bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -294,7 +294,7 @@ public class Checkpointer extends GridWorker {
throw t;
}
finally {
- if (err == null && !(isCancelled.get()))
+ if (err == null && !(isCancelled))
err = new IllegalStateException("Thread is terminated unexpectedly: " + name());
if (err instanceof OutOfMemoryError)
@@ -826,7 +826,7 @@ public class Checkpointer extends GridWorker {
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
- isCancelled.set(true);
+ isCancelled = true;
}
}
@@ -887,7 +887,7 @@ public class Checkpointer extends GridWorker {
log.debug("Cancelling grid runnable: " + this);
// Do not interrupt runner thread.
- isCancelled.set(true);
+ isCancelled = true;
synchronized (this) {
notifyAll();
@@ -915,7 +915,7 @@ public class Checkpointer extends GridWorker {
public void shutdownNow() {
shutdownNow = true;
- if (!isCancelled.get())
+ if (!isCancelled)
cancel();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 6711b411a7d..f64b11ac8ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1890,7 +1890,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
private void shutdown() throws IgniteInterruptedCheckedException {
synchronized (this) {
- isCancelled.set(true);
+ isCancelled = true;
notifyAll();
}
@@ -1985,7 +1985,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
Thread.currentThread().interrupt();
synchronized (this) {
- isCancelled.set(true);
+ isCancelled = true;
}
}
catch (Throwable t) {
@@ -2141,7 +2141,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
public void restart() {
assert runner() == null : "FileArchiver is still running";
- isCancelled.set(false);
+ isCancelled = false;
new IgniteThread(archiver).start();
}
@@ -2249,7 +2249,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
void restart() {
assert runner() == null : "FileCompressorWorker is still running.";
- isCancelled.set(false);
+ isCancelled = false;
new IgniteThread(this).start();
}
@@ -2530,7 +2530,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " +
"[tmp=" + unzipTmp + ", raw=" + unzip + "]", e);
}
- else if (!isCancelled.get()) {
+ else if (!isCancelled) {
ex = new IgniteCheckedException("Error during WAL segment decompression [segmentIdx=" +
segmentToDecompress + "]", e);
}
@@ -2549,14 +2549,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
catch (InterruptedException e) {
Thread.currentThread().interrupt();
- if (!isCancelled.get())
+ if (!isCancelled)
err = e;
}
catch (Throwable t) {
err = t;
}
finally {
- if (err == null && !isCancelled.get())
+ if (err == null && !isCancelled)
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
@@ -2605,7 +2605,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
void restart() {
assert runner() == null : "FileDecompressor is still running.";
- isCancelled.set(false);
+ isCancelled = false;
new IgniteThread(this).start();
}
@@ -3292,7 +3292,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
catch (IgniteInterruptedCheckedException e) {
Thread.currentThread().interrupt();
- isCancelled.set(true);
+ isCancelled = true;
}
catch (Throwable t) {
err = t;
@@ -3314,7 +3314,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws IgniteInterruptedCheckedException If failed to wait for worker shutdown.
*/
private void shutdown() throws IgniteInterruptedCheckedException {
- isCancelled.set(true);
+ isCancelled = true;
U.join(this);
}
@@ -3325,7 +3325,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
public void restart() {
assert runner() == null : "FileCleaner is still running";
- isCancelled.set(false);
+ isCancelled = false;
new IgniteThread(this).start();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
index 8d6d97f4700..278ae632c56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -412,7 +412,7 @@ public class FileHandleManagerImpl implements FileHandleManager {
unparkWaiters(MAX_VALUE);
- if (err == null && !isCancelled.get())
+ if (err == null && !isCancelled)
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
@@ -592,7 +592,7 @@ public class FileHandleManagerImpl implements FileHandleManager {
public void restart() {
assert runner() == null : "WALWriter is still running.";
- isCancelled.set(false);
+ isCancelled = false;
new IgniteThread(this).start();
}
@@ -649,7 +649,7 @@ public class FileHandleManagerImpl implements FileHandleManager {
public void restart() {
assert runner() == null : "WalSegmentSyncer is running.";
- isCancelled.set(false);
+ isCancelled = false;
new IgniteThread(walSegmentSyncWorker).start();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
index 5bae8fb052f..5316f627c99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageL
import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.ACTUALIZE;
import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.CLUSTER_WIDE_UPDATE;
@@ -125,7 +124,7 @@ public class DistributedConfigurationProcessor extends GridProcessorAdapter impl
* @param propKey Key of specific property.
* @return Property key for meta storage.
*/
- public static String toMetaStorageKey(String propKey) {
+ private static String toMetaStorageKey(String propKey) {
return DIST_CONF_PREFIX + propKey;
}
@@ -170,7 +169,7 @@ public class DistributedConfigurationProcessor extends GridProcessorAdapter impl
* @param name Property name.
* @return Public property.
*/
- public @Nullable DistributedChangeableProperty<Serializable> property(String name) {
+ public DistributedChangeableProperty<Serializable> property(String name) {
DistributedChangeableProperty<?> p = props.get(name);
if (!(p instanceof DistributedChangeableProperty))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 71e62477676..122b2ea66e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -74,7 +74,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
@@ -88,7 +87,6 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
@@ -115,11 +113,9 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
-import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty.detachedLongProperty;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.CPU_LOAD;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
@@ -170,12 +166,6 @@ public class GridJobProcessor extends GridProcessorAdapter {
/** Total jobs waiting time metric name. */
public static final String WAITING_TIME = "WaitingTime";
- /**
- * Distributed property that defines the timeout for interrupting the
- * {@link GridJobWorker worker} after {@link GridJobWorker#cancel() cancellation} in mills.
- */
- public static final String COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT = "computeJobWorkerInterruptTimeout";
-
/** */
private final Marshaller marsh;
@@ -322,10 +312,6 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
@Nullable private final String jobPriAttrKey;
- /** Timeout interrupt {@link GridJobWorker workers} after {@link GridJobWorker#cancel cancel} im mills. */
- private final DistributedLongProperty computeJobWorkerInterruptTimeout =
- detachedLongProperty(COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT);
-
/**
* @param ctx Kernal context.
*/
@@ -392,15 +378,6 @@ public class GridJobProcessor extends GridProcessorAdapter {
taskPriAttrKey = null;
jobPriAttrKey = null;
}
-
- ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> {
- computeJobWorkerInterruptTimeout.addListener(makeUpdateListener(
- "Compute job parameter '%s' was changed from '%s' to '%s'",
- log
- ));
-
- dispatcher.registerProperty(computeJobWorkerInterruptTimeout);
- });
}
/** {@inheritDoc} */
@@ -901,7 +878,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
* In most cases this method should be called from main read lock
* to avoid jobs activation after node stop has started.
*/
- public void handleCollisions() {
+ private void handleCollisions() {
assert !jobAlwaysActivate;
if (handlingCollision.get()) {
@@ -1350,9 +1327,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
holdLsnr,
partsReservation,
req.getTopVer(),
- req.executorName(),
- this::computeJobWorkerInterruptTimeout
- );
+ req.executorName());
jobCtx.job(job);
@@ -2439,11 +2414,4 @@ public class GridJobProcessor extends GridProcessorAdapter {
else
return w -> sesId.equals(w.getSession().getId()) && jobId.equals(w.getJobId());
}
-
- /**
- * @return Interruption timeout of {@link GridJobWorker workers} (in millis) after {@link GridWorker#cancel cancel} is called.
- */
- public long computeJobWorkerInterruptTimeout() {
- return computeJobWorkerInterruptTimeout.getOrDefault(ctx.config().getFailureDetectionTimeout());
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index c1dc7f4be29..09ef5adb006 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.LongSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -93,7 +92,11 @@ import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.SUS
*/
public class GridJobWorker extends GridWorker implements GridTimeoutObject {
/** Per-thread held flag. */
- private static final ThreadLocal<Boolean> HOLD = ThreadLocal.withInitial(() -> false);
+ private static final ThreadLocal<Boolean> HOLD = new ThreadLocal<Boolean>() {
+ @Override protected Boolean initialValue() {
+ return false;
+ }
+ };
/** Static logger to avoid re-creation. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -186,13 +189,6 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
private volatile ComputeJobStatusEnum status = QUEUED;
/**
- * Supplier of timeout interrupt {@link GridJobWorker workers} after {@link GridJobWorker#cancel cancel} im mills.
- */
- private final LongSupplier jobInterruptTimeoutSupplier;
-
- /**
- * Constructor.
- *
* @param ctx Kernal context.
* @param dep Grid deployment.
* @param createTime Create time.
@@ -207,8 +203,6 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
* @param partsReservation Reserved partitions (must be released at the job finish).
* @param reqTopVer Affinity topology version of the job request.
* @param execName Custom executor name.
- * @param jobInterruptTimeoutSupplier Supplier of timeout interrupt
- * {@link GridJobWorker workers} after {@link GridJobWorker#cancel cancel} im mills.
*/
GridJobWorker(
GridKernalContext ctx,
@@ -224,11 +218,10 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
GridJobHoldListener holdLsnr,
GridReservable partsReservation,
AffinityTopologyVersion reqTopVer,
- String execName,
- LongSupplier jobInterruptTimeoutSupplier
- ) {
+ String execName) {
super(ctx.igniteInstanceName(), "grid-job-worker", ctx.log(GridJobWorker.class));
+ assert ctx != null;
assert ses != null;
assert jobCtx != null;
assert taskNode != null;
@@ -249,7 +242,6 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
this.partsReservation = partsReservation;
this.reqTopVer = reqTopVer;
this.execName = execName;
- this.jobInterruptTimeoutSupplier = jobInterruptTimeoutSupplier;
if (job != null)
this.job = job;
@@ -1167,64 +1159,6 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
return jobId.hashCode();
}
- /** {@inheritDoc} */
- @Override protected void onCancel(boolean firstCancelRequest) {
- if (firstCancelRequest)
- handleCancel();
- else {
- if (log.isDebugEnabled()) {
- Thread runner = runner();
-
- log.debug(String.format(
- "Worker cancellation is ignored [jobId=%s, interrupted=%s]",
- getJobId(),
- runner == null ? "unknown" : runner.isInterrupted()
- ));
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void onCancelledBeforeWorkerScheduled() {
- // To ensure that the worker does not hang if the cancellation was before the start of its execution.
- handleCancel();
- }
-
- /**
- * Handles the cancellation of the worker.
- */
- private void handleCancel() {
- long timeout = jobInterruptTimeoutSupplier.getAsLong();
-
- if (timeout > 0) {
- ctx.timeout().addTimeoutObject(
- new JobWorkerInterruptionTimeoutObject(this, U.currentTimeMillis() + timeout)
- );
-
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Worker will be interrupted later [jobId=%s, timeout=%s]",
- getJobId(),
- U.humanReadableDuration(timeout)
- ));
- }
- }
- else {
- Thread runner = runner();
-
- if (runner != null)
- runner.interrupt();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Worker is interrupted on cancel [jobId=%s, interrupted=%s]",
- getJobId(),
- runner == null ? "unknown" : runner.isInterrupted()
- ));
- }
- }
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridJobWorker.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/JobWorkerInterruptionTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/JobWorkerInterruptionTimeoutObject.java
deleted file mode 100644
index 3e5b37bf69e..00000000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/JobWorkerInterruptionTimeoutObject.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.job;
-
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-
-/**
- * Timeout object for delayed {@link GridJobWorker worker} interruption.
- *
- * <p>After calling {@link GridJobWorker#cancel} the worker should try to complete gracefully,
- * if it doesn't then it will {@link Thread#interrupt interrupt} after some time.
- */
-public class JobWorkerInterruptionTimeoutObject implements GridTimeoutObject {
- /** Compute job worker. */
- private final GridJobWorker jobWorker;
-
- /** ID of timeout object. */
- private final IgniteUuid id;
-
- /** Time when the timeout object should be executed in mills. */
- private final long endTime;
-
- /**
- * Constructor.
- *
- * @param jobWorker Compute job worker.
- * @param endTime Time when the timeout object should be executed in mills.
- */
- public JobWorkerInterruptionTimeoutObject(
- GridJobWorker jobWorker,
- long endTime
- ) {
- this.jobWorker = jobWorker;
- this.endTime = endTime;
-
- id = IgniteUuid.randomUuid();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid timeoutId() {
- return id;
- }
-
- /** {@inheritDoc} */
- @Override public long endTime() {
- return endTime;
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- assert jobWorker.isCancelled() : jobWorker;
-
- Thread runner = jobWorker.runner();
-
- if (runner != null && !jobWorker.isDone())
- runner.interrupt();
- }
-
- /**
- * @return Compute job worker.
- */
- public GridJobWorker jobWorker() {
- return jobWorker;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(JobWorkerInterruptionTimeoutObject.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
index bb973857e8a..ebc5c73bc5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
@@ -100,7 +100,7 @@ public class DmsDataWriterWorker extends GridWorker {
/** Start new distributed metastorage worker thread. */
public void start() {
- isCancelled.set(false);
+ isCancelled = false;
new IgniteThread(igniteInstanceName(), "dms-writer-thread", this).start();
}
@@ -187,7 +187,7 @@ public class DmsDataWriterWorker extends GridWorker {
updateQueue.offer(new FutureTask<>(() -> STOP));
latch.countDown();
- isCancelled.set(true);
+ isCancelled = true;
Thread runner = runner();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java
index 7f7b730126a..692bad5aa1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutObject.java
@@ -26,15 +26,15 @@ public interface GridTimeoutObject {
/**
* @return ID of the object.
*/
- IgniteUuid timeoutId();
+ public IgniteUuid timeoutId();
/**
* @return End time.
*/
- long endTime();
+ public long endTime();
/**
* Timeout callback.
*/
- void onTimeout();
+ public void onTimeout();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index f0b3d3b94ae..7efcea9c8a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -296,7 +296,7 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
throw t;
}
finally {
- if (err == null && !isCancelled.get())
+ if (err == null && !isCancelled)
err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly.");
if (err instanceof OutOfMemoryError)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index bff78e8a3d6..50d9b9ef399 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -658,7 +658,7 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
}
}
- if (!isCancelled.get()) {
+ if (!isCancelled) {
errHnd.apply(new IllegalStateException("Thread " + Thread.currentThread().getName() +
" is terminated unexpectedly"));
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 2f3e8e55295..50ff443a9d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2276,7 +2276,7 @@ public class GridNioServer<T> {
}
// select() call above doesn't throw on interruption; checking it here to propagate timely.
- if (!closed && !isCancelled.get() && Thread.interrupted())
+ if (!closed && !isCancelled && Thread.interrupted())
throw new InterruptedException();
}
finally {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
index 85959950f33..615d5062a94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.util.worker;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
@@ -50,7 +49,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
private volatile boolean finished;
/** Whether or not this runnable is cancelled. */
- protected final AtomicBoolean isCancelled = new AtomicBoolean();
+ protected volatile boolean isCancelled;
/** Actual thread runner. */
private volatile Thread runner;
@@ -115,8 +114,9 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
log.debug("Grid runnable started: " + name);
try {
- if (isCancelled.get())
- onCancelledBeforeWorkerScheduled();
+ // Special case, when task gets cancelled before it got scheduled.
+ if (isCancelled)
+ runner.interrupt();
// Listener callback.
if (lsnr != null)
@@ -160,7 +160,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
lsnr.onStopped(this);
if (log.isDebugEnabled())
- if (isCancelled.get())
+ if (isCancelled)
log.debug("Grid runnable finished due to cancellation: " + name);
else if (runner.isInterrupted())
log.debug("Grid runnable finished due to interruption without cancellation: " + name);
@@ -191,9 +191,9 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
}
/**
- * @return Runner thread, {@code null} if the worker has not yet started executing.
+ * @return Runner thread.
*/
- public @Nullable Thread runner() {
+ public Thread runner() {
return runner;
}
@@ -216,13 +216,20 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
}
/**
- * Cancels this runnable.
+ * Cancels this runnable interrupting actual runner.
*/
public void cancel() {
if (log.isDebugEnabled())
log.debug("Cancelling grid runnable: " + this);
- onCancel(isCancelled.compareAndSet(false, true));
+ isCancelled = true;
+
+ Thread runner = this.runner;
+
+ // Cannot apply Future.cancel() because if we do, then Future.get() would always
+ // throw CancellationException and we would not be able to wait for task completion.
+ if (runner != null)
+ runner.interrupt();
}
/**
@@ -234,7 +241,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
if (log.isDebugEnabled())
log.debug("Joining grid runnable: " + this);
- if ((runner == null && isCancelled.get()) || finished)
+ if ((runner == null && isCancelled) || finished)
return;
synchronized (mux) {
@@ -252,7 +259,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
public boolean isCancelled() {
Thread runner = this.runner;
- return isCancelled.get() || (runner != null && runner.isInterrupted());
+ return isCancelled || (runner != null && runner.isInterrupted());
}
/**
@@ -299,31 +306,6 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher {
lsnr.onIdle(this);
}
- /**
- * Callback on runner cancellation.
- *
- * @param firstCancelRequest Flag indicating that worker cancellation was requested for the first time.
- */
- protected void onCancel(boolean firstCancelRequest) {
- Thread runner = this.runner;
-
- // Cannot apply Future.cancel() because if we do, then Future.get() would always
- // throw CancellationException, and we would not be able to wait for task completion.
- if (runner != null)
- runner.interrupt();
- }
-
- /**
- * Callback on special case, when task is cancelled before is has been scheduled.
- */
- protected void onCancelledBeforeWorkerScheduled() {
- Thread runner = this.runner;
-
- assert runner != null : this;
-
- runner.interrupt();
- }
-
/** {@inheritDoc} */
@Override public String toString() {
Thread runner = this.runner;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java
index a11ecd762ee..b921106d13c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelOnGridStopSelfTest.java
@@ -60,9 +60,6 @@ public class GridCancelOnGridStopSelfTest extends GridCommonAbstractTest {
cancelCall = false;
try (Ignite g = startGrid(1)) {
- // We change it because compute jobs will go to sleep.
- assertTrue(computeJobWorkerInterruptTimeout(g).propagate(10L));
-
cnt = new CountDownLatch(1);
g.compute().executeAsync(CancelledTask.class, null);
@@ -70,7 +67,7 @@ public class GridCancelOnGridStopSelfTest extends GridCommonAbstractTest {
cnt.await();
}
- assertTrue(cancelCall);
+ assert cancelCall;
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java
index 564793b93ae..1c44ad125b6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelUnusedJobSelfTest.java
@@ -43,9 +43,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-
/**
* Cancel unused job test.
*/
@@ -96,32 +93,31 @@ public class GridCancelUnusedJobSelfTest extends GridCommonAbstractTest {
public void testCancel() throws Exception {
Ignite ignite = G.ignite(getTestIgniteInstanceName());
- // We change it because compute jobs will go to sleep.
- assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
-
ignite.compute().localDeployTask(GridCancelTestTask.class, U.detectClassLoader(GridCancelTestTask.class));
ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridCancelTestTask.class.getName(), null);
- assertNotNull(fut);
-
// Wait until jobs begin execution.
- assertTrue("Jobs did not start.", startSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS));
+ boolean await = startSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+
+ assert await : "Jobs did not start.";
info("Test task result: " + fut);
+ assert fut != null;
+
// Only first job should successfully complete.
- assertThat(fut.get(getTestTimeout()), equalTo(1));
+ Object res = fut.get();
+ assert (Integer)res == 1;
// Wait for all jobs to finish.
- assertTrue("Jobs did not stop.", stopSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS));
+ await = stopSignal.await(WAIT_TIME, TimeUnit.MILLISECONDS);
+ assert await : "Jobs did not stop.";
// One is definitely processed. But there might be some more processed or cancelled or processed and cancelled.
// Thus total number should be at least SPLIT_COUNT and at most (SPLIT_COUNT - 1) *2 +1
- assertTrue(
- "Invalid cancel count value: " + cancelCnt,
- (cancelCnt + processedCnt) >= SPLIT_COUNT && (cancelCnt + processedCnt) <= (SPLIT_COUNT - 1) * 2 + 1
- );
+ assert (cancelCnt + processedCnt) >= SPLIT_COUNT && (cancelCnt + processedCnt) <= (SPLIT_COUNT - 1) * 2 + 1 :
+ "Invalid cancel count value: " + cancelCnt;
}
/**
@@ -175,8 +171,6 @@ public class GridCancelUnusedJobSelfTest extends GridCommonAbstractTest {
private ComputeTaskSession ses;
/**
- * Constructor.
- *
* @param arg Argument.
*/
private GridCancelTestJob(Integer arg) {
@@ -185,7 +179,7 @@ public class GridCancelUnusedJobSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public Serializable execute() {
- int arg = argument(0);
+ int arg = this.<Integer>argument(0);
try {
if (log.isInfoEnabled())
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java
index 0547a01bc71..f5c7fc9175f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridCancelledJobsMetricsSelfTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.resources.LoggerResource;
@@ -43,14 +44,11 @@ import org.apache.ignite.spi.collision.CollisionJobContext;
import org.apache.ignite.spi.collision.CollisionSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
-import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-
/**
* Cancelled jobs metrics self test.
*/
@@ -87,24 +85,25 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
public void testCancelledJobs() throws Exception {
final Ignite ignite = G.ignite(getTestIgniteInstanceName());
- // We change it because compute jobs will go to sleep.
- assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
-
Collection<ComputeTaskFuture<?>> futs = new ArrayList<>();
for (int i = 1; i <= 10; i++)
futs.add(ignite.compute().executeAsync(CancelledTask.class, null));
// Wait to be sure that metrics were updated.
- waitForCondition(() -> ignite.cluster().localNode().metrics().getTotalCancelledJobs() > 0, 5000);
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return ignite.cluster().localNode().metrics().getTotalCancelledJobs() > 0;
+ }
+ }, 5000);
colSpi.externalCollision();
for (ComputeTaskFuture<?> fut : futs) {
try {
- fut.get(getTestTimeout());
+ fut.get();
- fail("Job was not interrupted.");
+ assert false : "Job was not interrupted.";
}
catch (IgniteException e) {
if (e.hasCause(InterruptedException.class))
@@ -115,7 +114,9 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
}
// Job was cancelled and now we need to calculate metrics.
- assertThat(ignite.cluster().localNode().metrics().getTotalCancelledJobs(), equalTo(10));
+ int totalCancelledJobs = ignite.cluster().localNode().metrics().getTotalCancelledJobs();
+
+ assert totalCancelledJobs == 10 : "Metrics were not updated. Expected 10 got " + totalCancelledJobs;
}
/**
@@ -129,7 +130,7 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public Object reduce(List<ComputeJobResult> results) {
- assertFalse(results.get(0).isCancelled());
+ assert results.get(0).isCancelled() : "Wrong job result status.";
return null;
}
@@ -147,6 +148,13 @@ public class GridCancelledJobsMetricsSelfTest extends GridCommonAbstractTest {
Thread.sleep(Long.MAX_VALUE);
}
catch (InterruptedException ignored) {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1) {
+ throw new IgniteException("Unexpected exception: ", e1);
+ }
+
throw new IgniteException("Job got interrupted while waiting for cancellation.");
}
finally {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java
index fc8371bf081..73e53bf15af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverCustomTopologySelfTest.java
@@ -43,9 +43,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-
/**
* Test failover and custom topology. Topology returns local node if remote node fails.
*/
@@ -90,10 +87,11 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
@Test
public void testFailoverTopology() throws Exception {
try {
- Ignite ignite1 = startGrids(2);
+ Ignite ignite1 = startGrid(1);
+ Ignite ignite2 = startGrid(2);
- // We change it because compute jobs will go to sleep.
- assertTrue(computeJobWorkerInterruptTimeout(ignite1).propagate(10L));
+ assert ignite1 != null;
+ assert ignite2 != null;
ignite1.compute().localDeployTask(JobTask.class, JobTask.class.getClassLoader());
@@ -106,9 +104,9 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
mux.wait();
}
- stopAndCancelGrid(1);
+ stopAndCancelGrid(2);
- String res = fut.get(getTestTimeout());
+ String res = fut.get();
info("Task result: " + res);
}
@@ -118,10 +116,13 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
info("Failed over: " + failCnt.get());
- assertThat(failCnt.get(), equalTo(1));
+ assert failCnt.get() == 1 : "Invalid fail over counter [expected=1, actual=" + failCnt.get() + ']';
}
finally {
- stopAllGrids();
+ stopGrid(1);
+
+ // Stopping stopped instance just in case.
+ stopGrid(2);
}
}
@@ -138,11 +139,11 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) {
- assertNotNull(ignite);
+ assert ignite != null;
UUID locNodeId = ignite.configuration().getNodeId();
- assertNotNull(locNodeId);
+ assert locNodeId != null;
if (log.isInfoEnabled())
log.info("Mapping jobs [subgrid=" + subgrid + ", arg=" + arg + ']');
@@ -166,7 +167,7 @@ public class GridFailoverCustomTopologySelfTest extends GridCommonAbstractTest {
UUID nodeId = ignite.configuration().getNodeId();
- assertNotNull(nodeId);
+ assert nodeId != null;
if (!nodeId.equals(argument(0))) {
try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
index 2ede9cbb4fc..be451625462 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
@@ -73,9 +73,6 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
stopAllGrids();
ignite = startGridsMultiThreaded(2);
-
- // We are changing it because compute jobs fall asleep.
- assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java
index 9fc28dbca87..522ed7786a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridStopWithCancelSelfTest.java
@@ -71,9 +71,6 @@ public class GridStopWithCancelSelfTest extends GridCommonAbstractTest {
try {
Ignite ignite = startGrid("testGrid");
- // We are changing it because compute jobs fall asleep.
- assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
-
executeAsync(ignite.compute(), CancelledTask.class, null);
cnt.await();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java
index 606b5d26ff5..5f9d9bd5669 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFutureImplStopGridSelfTest.java
@@ -40,9 +40,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.startsWith;
-
/**
* Test for task future when grid stops.
*/
@@ -75,17 +72,17 @@ public class GridTaskFutureImplStopGridSelfTest extends GridCommonAbstractTest {
public void testGet() throws Exception {
Ignite ignite = startGrid(getTestIgniteInstanceName());
- // We change it because compute jobs fall asleep.
- assertTrue(computeJobWorkerInterruptTimeout(ignite).propagate(10L));
-
Thread futThread = null;
try {
final ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridStopTestTask.class.getName(), null);
- fut.listen((CI1<IgniteFuture>)gridFut -> {
- synchronized (mux) {
- mux.notifyAll();
+ fut.listen(new CI1<IgniteFuture>() {
+ @SuppressWarnings({"NakedNotify"})
+ @Override public void apply(IgniteFuture gridFut) {
+ synchronized (mux) {
+ mux.notifyAll();
+ }
}
});
@@ -107,7 +104,7 @@ public class GridTaskFutureImplStopGridSelfTest extends GridCommonAbstractTest {
failed.set(true);
// Make sure that message contains info about stopping grid.
- assertThat(e.getMessage(), startsWith("Task failed due to stopping of the grid:"));
+ assert e.getMessage().startsWith("Task failed due to stopping of the grid:");
}
finally {
latch.countDown();
@@ -138,7 +135,7 @@ public class GridTaskFutureImplStopGridSelfTest extends GridCommonAbstractTest {
info("Test task result [failed=" + failed.get() + ", taskFuture=" + fut + ']');
- assertTrue(finished);
+ assert finished : "Future thread was not stopped.";
assert fut.isDone();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java
index 6bd6daf901f..f2475eab96a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewComputeJobTest.java
@@ -97,9 +97,6 @@ public class SystemViewComputeJobTest extends GridCommonAbstractTest {
cache = server.createCache("test-cache");
cache.put(1, 1);
-
- // We are changing it because compute jobs fall asleep.
- assertTrue(computeJobWorkerInterruptTimeout(server).propagate(10L));
}
/** Tests work of {@link SystemView} for compute grid {@link IgniteCompute#broadcastAsync(IgniteRunnable)} call. */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
index e6ece4a4b3f..17c783dc98a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
@@ -91,9 +91,6 @@ public class ComputeJobStatusTest extends GridCommonAbstractTest {
node0 = crd;
node1 = grid(1);
-
- // We are changing it because compute jobs fall asleep.
- assertTrue(computeJobWorkerInterruptTimeout(node0).propagate(10L));
}
/** {@inheritDoc} */
@@ -261,9 +258,6 @@ public class ComputeJobStatusTest extends GridCommonAbstractTest {
U.sleep(100);
checkTaskJobStatuses(sesId, FINISHED, null);
-
- // Let's wait a bit for the callcc (above) to complete.
- U.sleep(100);
}
// Let's check that the job (WaitJob) on the node0 has finished
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java
deleted file mode 100644
index 0bd42eae51f..00000000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/InterruptComputeJobTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.compute;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobSibling;
-import org.apache.ignite.compute.ComputeTaskAdapter;
-import org.apache.ignite.compute.ComputeTaskFuture;
-import org.apache.ignite.compute.ComputeTaskSession;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.failure.FailureHandler;
-import org.apache.ignite.failure.StopNodeFailureHandler;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.job.GridJobProcessor;
-import org.apache.ignite.internal.processors.job.GridJobWorker;
-import org.apache.ignite.internal.processors.job.JobWorkerInterruptionTimeoutObject;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.spi.collision.CollisionContext;
-import org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
-import org.junit.Test;
-
-import static java.util.function.Function.identity;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
-import static org.apache.ignite.cluster.ClusterState.ACTIVE;
-import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.toMetaStorageKey;
-import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
-import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-
-/**
- * {@link GridJobWorker} interrupt testing.
- */
-public class InterruptComputeJobTest extends GridCommonAbstractTest {
- /** Node. */
- private static IgniteEx node;
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- stopAllGrids();
-
- node = startGrid();
-
- node.cluster().state(ACTIVE);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
-
- stopAllGrids();
-
- node = null;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = true;
-
- // Reset distributed property.
- node.context().distributedMetastorage().remove(
- toMetaStorageKey(computeJobWorkerInterruptTimeout(node).getName())
- );
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- // Cleanup with task release.
- CountDownLatchJob.JOBS.removeIf(job -> {
- job.latch.countDown();
-
- return true;
- });
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- return super.getConfiguration(igniteInstanceName).setCollisionSpi(new PriorityQueueCollisionSpiEx());
- }
-
- /** {@inheritDoc} */
- @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
- return new StopNodeFailureHandler();
- }
-
- /**
- * Checks that method {@link GridJobProcessor#computeJobWorkerInterruptTimeout()}
- * returns a valid value that depends on distributed property "computeJobWorkerInterruptTimeout".
- *
- * @throws Exception If failed.
- */
- @Test
- public void testComputeJobWorkerInterruptTimeoutProperty() throws Exception {
- // Check default.
- assertThat(
- node.context().job().computeJobWorkerInterruptTimeout(),
- equalTo(node.context().config().getFailureDetectionTimeout())
- );
-
- // Check update value.
- computeJobWorkerInterruptTimeout(node).propagate(100500L);
-
- assertThat(node.context().job().computeJobWorkerInterruptTimeout(), equalTo(100500L));
- }
-
- /**
- * Checks that when {@link GridJobWorker#cancel()} (even twice) is called, the {@link GridJobWorker#runner()}
- * is not interrupted and that only one {@link JobWorkerInterruptionTimeoutObject} is created.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testCancel() throws Exception {
- computeJobWorkerInterruptTimeout(node).propagate(TimeUnit.HOURS.toMillis(1));
-
- ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
-
- GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
-
- cancelWitchChecks(jobWorker);
-
- cancelWitchChecks(jobWorker);
-
- ((CountDownLatchJob)jobWorker.getJob()).latch.countDown();
-
- taskFut.get(getTestTimeout());
- }
-
- /**
- * Checks that after {@link GridJobWorker#cancel()}, the {@link JobWorkerInterruptionTimeoutObject}
- * will trigger the {@link Thread#interrupt()}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testInterrupt() throws Exception {
- computeJobWorkerInterruptTimeout(node).propagate(100L);
-
- ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
-
- GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
-
- cancelWitchChecks(jobWorker);
-
- // We are waiting for the GridJobWorkerInterrupter to interrupt the worker.
- taskFut.get(1_000L);
-
- assertThat(jobWorker.isCancelled(), equalTo(true));
- assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(true));
- assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), empty());
- }
-
- /**
- * Checks that if the worker was {@link GridJobWorker#cancel()} (even twice) before starting work,
- * then it will be canceled, not interrupted, and have one {@link JobWorkerInterruptionTimeoutObject} before
- * and two after the start.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testCancelBeforeStart() throws Exception {
- PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = false;
-
- computeJobWorkerInterruptTimeout(node).propagate(TimeUnit.HOURS.toMillis(1));
-
- ComputeTaskFuture<Void> taskFut = node.compute().executeAsync(new ComputeTask(CountDownLatchJob.class), null);
-
- GridJobWorker jobWorker = jobWorker(node, taskFut.getTaskSession());
-
- cancelBeforeStartWitchChecks(jobWorker);
-
- cancelBeforeStartWitchChecks(jobWorker);
-
- PriorityQueueCollisionSpiEx.collisionSpiEx(node).handleCollision = true;
-
- node.context().job().handleCollisions();
-
- assertTrue(waitForCondition(jobWorker::isStarted, getTestTimeout(), 10));
-
- assertThat(jobWorker.isCancelled(), equalTo(true));
- assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(false));
- assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(2));
- }
-
- /**
- * @param n Node.
- * @param taskSession Task session.
- * @return Job worker is expected to be the only one and either active or passive.
- */
- private static GridJobWorker jobWorker(IgniteEx n, ComputeTaskSession taskSession) {
- Collection<ComputeJobSibling> siblings = taskSession.getJobSiblings();
-
- assertThat(siblings, hasSize(1));
-
- IgniteUuid jobId = F.first(siblings).getJobId();
-
- GridJobWorker jobWorker = n.context().job().activeJob(jobId);
-
- if (jobWorker == null) {
- Map<IgniteUuid, GridJobWorker> passiveJobs = getFieldValue(n.context().job(), "passiveJobs");
-
- if (passiveJobs != null)
- jobWorker = passiveJobs.get(jobId);
- }
-
- assertThat(jobWorker, notNullValue());
-
- return jobWorker;
- }
-
- /**
- * Cancels the worker, checking that it is canceled and not interrupted and only one interrupter is added.
- *
- * @param jobWorker Compute job worker.
- * @throws Exception If failed.
- */
- private void cancelWitchChecks(GridJobWorker jobWorker) throws Exception {
- assertTrue(waitForCondition(jobWorker::isStarted, getTestTimeout(), 10));
-
- jobWorker.cancel();
-
- assertThat(jobWorker.isCancelled(), equalTo(true));
- assertThat(countDownLatchJobInterrupted(jobWorker), equalTo(false));
- assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(1));
- }
-
- /**
- * Cancels the worker before it starts, checks that it is canceled, and creates one interrupter.
- *
- * @param jobWorker Compute job worker.
- */
- private void cancelBeforeStartWitchChecks(GridJobWorker jobWorker) {
- jobWorker.cancel();
-
- assertThat(jobWorker.isStarted(), equalTo(false));
- assertThat(jobWorker.runner(), nullValue());
-
- assertThat(jobWorker.isCancelled(), equalTo(true));
- assertThat(jobWorkerInterrupters(timeoutObjects(node), jobWorker), hasSize(1));
- }
-
- /**
- * @param n Node.
- * @return Value of {@code GridTimeoutProcessor#timeoutObjs}.
- */
- private static GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjects(IgniteEx n) {
- return getFieldValue(n.context().timeout(), "timeoutObjs");
- }
-
- /**
- * @param timeoutObjects Value of {@code GridTimeoutProcessor#timeoutObjs}.
- * @param jobWorker Compute job worker.
- * @return Collection of {@link JobWorkerInterruptionTimeoutObject} for {@code jobWorker}.
- */
- private static Collection<JobWorkerInterruptionTimeoutObject> jobWorkerInterrupters(
- GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjects,
- GridJobWorker jobWorker
- ) {
- return timeoutObjects.stream()
- .filter(JobWorkerInterruptionTimeoutObject.class::isInstance)
- .map(JobWorkerInterruptionTimeoutObject.class::cast)
- .filter(o -> o.jobWorker() == jobWorker)
- .collect(toList());
- }
-
- /**
- * @return Value of {@link CountDownLatchJob#interrupted}.
- */
- private static boolean countDownLatchJobInterrupted(GridJobWorker jobWorker) {
- return ((CountDownLatchJob)jobWorker.getJob()).interrupted;
- }
-
- /**
- * Test extension {@link PriorityQueueCollisionSpi}.
- */
- private static class PriorityQueueCollisionSpiEx extends PriorityQueueCollisionSpi {
- /** Collision handling flag. */
- volatile boolean handleCollision = true;
-
- /** {@inheritDoc} */
- @Override public void onCollision(CollisionContext ctx) {
- if (handleCollision)
- super.onCollision(ctx);
- }
-
- /**
- * @param n Node.
- * @return Test extension {@link PriorityQueueCollisionSpi}.
- */
- static PriorityQueueCollisionSpiEx collisionSpiEx(IgniteEx n) {
- return ((PriorityQueueCollisionSpiEx)n.configuration().getCollisionSpi());
- }
- }
-
- /**
- * Task that creates jobs.
- */
- private static class ComputeTask extends ComputeTaskAdapter<Object, Void> {
- /** Compute job class. */
- final Class<? extends ComputeJobAdapter> jobClass;
-
- /**
- * Constructor.
- *
- * @param jobClass Compute job class.
- */
- ComputeTask(Class<? extends ComputeJobAdapter> jobClass) {
- this.jobClass = jobClass;
- }
-
- /** {@inheritDoc} */
- @Override public Map<? extends ComputeJob, ClusterNode> map(
- List<ClusterNode> subgrid,
- @Nullable Object arg
- ) throws IgniteException {
- return subgrid.stream().collect(toMap(n -> newComputeJobInstance(arg), identity()));
- }
-
- /** {@inheritDoc} */
- @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
- return null;
- }
-
- /**
- * @param arg Argument.
- * @return New instance of {@link #jobClass}.
- */
- private ComputeJobAdapter newComputeJobInstance(@Nullable Object arg) {
- try {
- if (arg == null)
- return jobClass.newInstance();
-
- return jobClass.getDeclaredConstructor(arg.getClass()).newInstance(arg);
- }
- catch (Exception e) {
- throw new IgniteException(e);
- }
- }
- }
-
- /**
- * A job that is waiting for the latch counter to decrement in order to complete its work.
- */
- private static class CountDownLatchJob extends ComputeJobAdapter {
- /** All jobs. */
- static final Collection<CountDownLatchJob> JOBS = new ConcurrentLinkedQueue<>();
-
- /** Latch. */
- final CountDownLatch latch = new CountDownLatch(1);
-
- /** Interrupted. */
- volatile boolean interrupted;
-
- /**
- * Constructor.
- */
- public CountDownLatchJob() {
- JOBS.add(this);
- }
-
- /** {@inheritDoc} */
- @Override public Object execute() throws IgniteException {
- try {
- latch.await();
- }
- catch (InterruptedException e) {
- interrupted = true;
-
- Thread.currentThread().interrupt();
- }
-
- return null;
- }
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java
index f9cf4fc9a1c..45d4951a763 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromFutureSelfTest.java
@@ -47,10 +47,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-
/**
* Test of session siblings cancellation from future.
*/
@@ -77,7 +73,7 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
/**
*
*/
- public GridSessionCancelSiblingsFromFutureSelfTest() throws Exception {
+ public GridSessionCancelSiblingsFromFutureSelfTest() {
super(true);
}
@@ -96,14 +92,6 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
return c;
}
- /** {@inheritDoc} */
- @Override protected void beforeFirstTest() throws Exception {
- super.beforeFirstTest();
-
- // We are changing it because compute jobs fall asleep.
- assertTrue(computeJobWorkerInterruptTimeout(G.ignite(getTestIgniteInstanceName())).propagate(10L));
- }
-
/**
* @throws Exception if failed
*/
@@ -126,16 +114,18 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
final AtomicBoolean failed = new AtomicBoolean(false);
- GridTestUtils.runMultiThreaded(() -> {
- int num = sNum.get();
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ int num = sNum.get();
- try {
- checkTask(num);
- }
- catch (Throwable e) {
- error("Failed to execute task.", e);
+ try {
+ checkTask(num);
+ }
+ catch (Throwable e) {
+ error("Failed to execute task.", e);
- failed.set(true);
+ failed.set(true);
+ }
}
}, EXEC_COUNT, "grid-session-test");
@@ -153,11 +143,13 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridTaskSessionTestTask.class, num);
- assertNotNull(fut);
+ assert fut != null;
try {
// Wait until jobs begin execution.
- assertTrue("Jobs did not start.", startSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
+ boolean await = startSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
+
+ assert await : "Jobs did not start.";
Collection<ComputeJobSibling> jobSiblings = fut.getTaskSession().getJobSiblings();
@@ -166,19 +158,23 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
jobSibling.cancel();
}
- Object res = fut.get(getTestTimeout());
+ Object res = fut.get();
- assertThat(res, equalTo("interrupt-task-data"));
+ assert "interrupt-task-data".equals(res) : "Invalid task result: " + res;
// Wait for all jobs to finish.
- assertTrue("Jobs did not cancel.", stopSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
+ await = stopSig[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
+
+ assert await : "Jobs did not cancel.";
+
+ int cnt = interruptCnt[num].get();
- assertThat(interruptCnt[num].get(), equalTo(SPLIT_COUNT));
+ assert cnt == SPLIT_COUNT : "Invalid interrupt count value: " + cnt;
}
finally {
// We must wait for the jobs to be sure that they have completed
// their execution since they use static variable (shared for the tests).
- fut.get(getTestTimeout());
+ fut.get();
}
}
@@ -217,11 +213,11 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
if (log.isInfoEnabled())
log.info("Splitting jobs [task=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']');
- assertNotNull(arg);
+ assert arg != null;
taskNum = (Integer)arg;
- assertThat(taskNum, not(equalTo(-1)));
+ assert taskNum != -1;
Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT);
@@ -230,7 +226,7 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
private volatile Thread thread;
@Override public Serializable execute() {
- assertNotNull(taskSes);
+ assert taskSes != null;
thread = Thread.currentThread();
@@ -259,7 +255,7 @@ public class GridSessionCancelSiblingsFromFutureSelfTest extends GridCommonAbstr
}
@Override public void cancel() {
- assertNotNull(thread);
+ assert thread != null;
interruptCnt[taskNum].incrementAndGet();
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java
index 393a1d109b0..5b24d3a7061 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromJobSelfTest.java
@@ -51,10 +51,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-
/**
* Session cancellation tests.
*/
@@ -98,14 +94,6 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
return c;
}
- /** {@inheritDoc} */
- @Override protected void beforeFirstTest() throws Exception {
- super.beforeFirstTest();
-
- // We are changing it because compute jobs fall asleep.
- assertTrue(computeJobWorkerInterruptTimeout(G.ignite(getTestIgniteInstanceName())).propagate(10L));
- }
-
/**
* @throws Exception If failed.
*/
@@ -128,16 +116,18 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
final AtomicBoolean failed = new AtomicBoolean(false);
- GridTestUtils.runMultiThreaded(() -> {
- int num = sNum.get();
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ int num = sNum.get();
- try {
- checkTask(num);
- }
- catch (Throwable e) {
- error("Failed to execute task.", e);
+ try {
+ checkTask(num);
+ }
+ catch (Throwable e) {
+ error("Failed to execute task.", e);
- failed.set(true);
+ failed.set(true);
+ }
}
}, EXEC_COUNT, "grid-session-test");
@@ -155,22 +145,26 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridTaskSessionTestTask.class, num);
- assertNotNull(fut);
+ boolean await = startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
// Wait until jobs begin execution.
- assertTrue("Jobs did not start.", startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
+ assert await : "Jobs did not start.";
+
+ assert fut != null;
- Object res = fut.get(getTestTimeout());
+ Object res = fut.get();
- assertThat(res, equalTo("interrupt-task-data"));
+ assert "interrupt-task-data".equals(res) : "Invalid task result: " + res;
+
+ await = stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
// Wait for all jobs to finish.
- assertTrue(
- "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']',
- stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS)
- );
+ assert await :
+ "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']';
+
+ int cnt = interruptCnt[num].get();
- assertThat(interruptCnt[num].get(), equalTo(SPLIT_COUNT - 1));
+ assert cnt == SPLIT_COUNT - 1 : "Invalid interrupt count value: " + cnt;
}
/** */
@@ -209,11 +203,11 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
if (log.isInfoEnabled())
log.info("Splitting job [task=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']');
- assertNotNull(arg);
+ assert arg != null;
taskNum = (Integer)arg;
- assertThat(taskNum, not(equalTo(-1)));
+ assert taskNum != -1;
Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT);
@@ -228,7 +222,7 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
/** {@inheritDoc} */
@Override public Object execute() {
- assertNotNull(taskSes);
+ assert taskSes != null;
thread = Thread.currentThread();
@@ -247,7 +241,7 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
if (log.isInfoEnabled())
log.info("Job one is proceeding [jobId=" + jobId + ']');
- assertNotNull(jobId);
+ assert jobId != null;
Collection<ComputeJobSibling> jobSiblings = taskSes.getJobSiblings();
@@ -276,7 +270,7 @@ public class GridSessionCancelSiblingsFromJobSelfTest extends GridCommonAbstract
/** {@inheritDoc} */
@Override public void cancel() {
- assertNotNull(thread);
+ assert thread != null;
interruptCnt[taskNum].incrementAndGet();
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java
index 6f41ea29be8..abb9dac732e 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCancelSiblingsFromTaskSelfTest.java
@@ -49,10 +49,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-
/**
* Session cancellation tests.
*/
@@ -96,14 +92,6 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
return c;
}
- /** {@inheritDoc} */
- @Override protected void beforeFirstTest() throws Exception {
- super.beforeFirstTest();
-
- // We are changing it because compute jobs fall asleep.
- assertTrue(computeJobWorkerInterruptTimeout(G.ignite(getTestIgniteInstanceName())).propagate(10L));
- }
-
/**
* @throws Exception If failed.
*/
@@ -126,16 +114,18 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
final AtomicBoolean failed = new AtomicBoolean(false);
- GridTestUtils.runMultiThreaded(() -> {
- int num = sNum.get();
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ int num = sNum.get();
- try {
- checkTask(num);
- }
- catch (Throwable e) {
- error("Failed to execute task.", e);
+ try {
+ checkTask(num);
+ }
+ catch (Throwable e) {
+ error("Failed to execute task.", e);
- failed.set(true);
+ failed.set(true);
+ }
}
}, EXEC_COUNT, "grid-session-test");
@@ -153,28 +143,32 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
ComputeTaskFuture<?> fut = executeAsync(ignite.compute(), GridTaskSessionTestTask.class, num);
- assertNotNull(fut);
+ assert fut != null;
try {
// Wait until jobs begin execution.
- assertTrue("Jobs did not start.", startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS));
+ boolean await = startSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
+
+ assert await : "Jobs did not start.";
- Object res = fut.get(getTestTimeout());
+ Object res = fut.get();
- assertThat(res, equalTo("interrupt-task-data"));
+ assert "interrupt-task-data".equals(res) : "Invalid task result: " + res;
// Wait for all jobs to finish.
- assertTrue(
- "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']',
- stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS)
- );
+ await = stopSignal[num].await(WAIT_TIME, TimeUnit.MILLISECONDS);
+
+ assert await :
+ "Jobs did not cancel [interruptCount=" + Arrays.toString(interruptCnt) + ']';
+
+ int cnt = interruptCnt[num].get();
- assertThat(interruptCnt[num].get(), equalTo(SPLIT_COUNT - 1));
+ assert cnt == SPLIT_COUNT - 1 : "Invalid interrupt count value: " + cnt;
}
finally {
// We must wait for the jobs to be sure that they have completed
// their execution since they use static variable (shared for the tests).
- fut.get(getTestTimeout());
+ fut.get();
}
}
@@ -214,11 +208,11 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
if (log.isInfoEnabled())
log.info("Splitting job [job=" + this + ", gridSize=" + gridSize + ", arg=" + arg + ']');
- assertNotNull(arg);
+ assert arg != null;
taskNum = (Integer)arg;
- assertThat(taskNum, not(equalTo(-1)));
+ assert taskNum != -1;
Collection<ComputeJob> jobs = new ArrayList<>(SPLIT_COUNT);
@@ -229,7 +223,7 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
/** {@inheritDoc} */
@Override public Serializable execute() {
- assertNotNull(taskSes);
+ assert taskSes != null;
thread = Thread.currentThread();
@@ -266,7 +260,7 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
/** {@inheritDoc} */
@Override public void cancel() {
- assertNotNull(thread);
+ assert thread != null;
interruptCnt[taskNum].incrementAndGet();
@@ -285,7 +279,7 @@ public class GridSessionCancelSiblingsFromTaskSelfTest extends GridCommonAbstrac
IgniteUuid jobId = received.get(0).getJobContext().getJobId();
- assertNotNull(jobId);
+ assert jobId != null;
// Cancel all jobs except first job with argument 1.
for (ComputeJobSibling jobSibling : jobSiblings) {
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 6ccab2ca553..38711a50edc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1962,62 +1962,25 @@ public final class GridTestUtils {
* @return {@code true} if condition was achieved, {@code false} otherwise.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
*/
- public static boolean waitForCondition(
- GridAbsPredicate cond,
- long timeout
- ) throws IgniteInterruptedCheckedException {
- return waitForCondition(cond, timeout, DFLT_BUSYWAIT_SLEEP_INTERVAL);
- }
-
- /**
- * Waits for condition, polling in busy wait loop.
- *
- * @param cond Condition to wait for.
- * @param timeout Max time to wait in milliseconds.
- * @return {@code true} if condition was achieved, {@code false} otherwise.
- * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
- */
- public static boolean waitForCondition(
- GridAbsPredicate cond,
- long timeout,
- long checkInterval
- ) throws IgniteInterruptedCheckedException {
+ public static boolean waitForCondition(GridAbsPredicate cond, long timeout) throws IgniteInterruptedCheckedException {
long endTime = U.currentTimeMillis() + timeout;
long endTime0 = endTime < 0 ? Long.MAX_VALUE : endTime;
- return waitForCondition(cond, () -> U.currentTimeMillis() < endTime0, checkInterval);
- }
-
- /**
- * @param cond Condition to wait for.
- * @param wait Wait predicate.
- * @return {@code true} if condition was achieved, {@code false} otherwise.
- * @throws IgniteInterruptedCheckedException If interrupted.
- */
- public static boolean waitForCondition(
- GridAbsPredicate cond,
- BooleanSupplier wait
- ) throws IgniteInterruptedCheckedException {
- return waitForCondition(cond, wait, DFLT_BUSYWAIT_SLEEP_INTERVAL);
+ return waitForCondition(cond, () -> U.currentTimeMillis() < endTime0);
}
/**
* @param cond Condition to wait for.
* @param wait Wait predicate.
* @return {@code true} if condition was achieved, {@code false} otherwise.
- * @param checkInterval Time interval between two consecutive condition checks.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- public static boolean waitForCondition(
- GridAbsPredicate cond,
- BooleanSupplier wait,
- long checkInterval
- ) throws IgniteInterruptedCheckedException {
+ public static boolean waitForCondition(GridAbsPredicate cond, BooleanSupplier wait) throws IgniteInterruptedCheckedException {
while (wait.getAsBoolean()) {
if (cond.apply())
return true;
- U.sleep(checkInterval);
+ U.sleep(DFLT_BUSYWAIT_SLEEP_INTERVAL);
}
return false;
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index dfeb9c2c1f9..16004989f20 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -684,7 +684,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
}
/** */
- protected void beforeFirstTest() throws Exception {
+ private void beforeFirstTest() throws Exception {
sharedStaticIpFinder = new TcpDiscoveryVmIpFinder(true);
clsLdr = Thread.currentThread().getContextClassLoader();
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 4b11deb3128..478137a9544 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.testframework.junits.common;
-import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
@@ -115,8 +114,6 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
-import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
-import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.service.IgniteServiceProcessor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -160,8 +157,6 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.notNullValue;
/**
* Super class for all common tests.
@@ -2785,17 +2780,4 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
dbMgr.checkpointReadUnlock();
}
}
-
- /**
- * @param n Node.
- * @return Distributed property: {@code GridJobProcessor#computeJobWorkerInterruptTimeout}.
- */
- protected DistributedChangeableProperty<Serializable> computeJobWorkerInterruptTimeout(Ignite n) {
- DistributedChangeableProperty<Serializable> timeoutProperty = ((IgniteEx)n).context().distributedConfiguration()
- .property(GridJobProcessor.COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT);
-
- assertThat(timeoutProperty, notNullValue());
-
- return timeoutProperty;
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
index 4c172b398a5..c88ab8a708d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
@@ -77,7 +77,7 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr
@Override public void evaluate() throws Throwable {
GridSpiAbstractTest testClsInstance = (GridSpiAbstractTest)description.getTestClass().newInstance();
try {
- testClsInstance.beforeFirstTestInternal();
+ testClsInstance.beforeFirstTest();
base.evaluate();
}
@@ -157,7 +157,7 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr
}
/** */
- protected void beforeFirstTestInternal() throws Exception {
+ private void beforeFirstTest() throws Exception {
if (autoStart) {
GridSpiTest spiTest = GridTestUtils.getAnnotation(getClass(), GridSpiTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index f34166a6a09..8ad91a84bfa 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -85,7 +85,6 @@ import org.apache.ignite.internal.processors.compute.ComputeJobStatusTest;
import org.apache.ignite.internal.processors.compute.ComputeTaskWithWithoutFullSupportTest;
import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorConfigurationSelfTest;
import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorSelfTest;
-import org.apache.ignite.internal.processors.compute.InterruptComputeJobTest;
import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
import org.apache.ignite.internal.util.StripedExecutorTest;
import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest;
@@ -182,8 +181,7 @@ import org.junit.runners.Suite;
ComputeGridMonitorTest.class,
ComputeJobChangePriorityTest.class,
ComputeJobStatusTest.class,
- ComputeTaskWithWithoutFullSupportTest.class,
- InterruptComputeJobTest.class
+ ComputeTaskWithWithoutFullSupportTest.class
})
public class IgniteComputeGridTestSuite {
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
index c8f828bb201..a70c9ec353f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
@@ -107,9 +107,6 @@ public class KillCommandsMXBeanTest extends GridCommonAbstractTest {
srvs.get(0).cluster().state(ACTIVE);
- // We change to reduce the waiting time for interrupting compute job.
- computeJobWorkerInterruptTimeout(srvs.get(0)).propagate(100L);
-
IgniteCache<Object, Object> cache = startCli.getOrCreateCache(
new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
index 65163ddaae1..7dae6b5af94 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
@@ -95,9 +95,6 @@ public class KillCommandsSQLTest extends GridCommonAbstractTest {
srvs.get(0).cluster().state(ACTIVE);
- // We change to reduce the waiting time for interrupting compute job.
- computeJobWorkerInterruptTimeout(srvs.get(0)).propagate(100L);
-
IgniteCache<Object, Object> cache = startCli.getOrCreateCache(
new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Integer.class)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));