You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2019/06/26 06:35:45 UTC
[ignite] branch master updated: IGNITE-11926: GridJobProcessor
metrics migration. (#6622)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9a746d1 IGNITE-11926: GridJobProcessor metrics migration. (#6622)
9a746d1 is described below
commit 9a746d15eba6be4e4f53fe1bf1ea05972c396560
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Jun 26 09:35:33 2019 +0300
IGNITE-11926: GridJobProcessor metrics migration. (#6622)
GridJobProcessor metrics migration to the new metrics subsystem.
---
.../org/apache/ignite/IgniteSystemProperties.java | 4 +
.../internal/processors/job/GridJobProcessor.java | 194 +++++++++--
.../processors/jobmetrics/GridJobMetrics.java | 4 +
.../jobmetrics/GridJobMetricsProcessor.java | 4 +
.../jobmetrics/GridJobMetricsSnapshot.java | 6 +-
.../processors/jobmetrics/package-info.java | 4 +-
.../jobmetrics/GridJobMetricsSelfTest.java | 355 +++++++++++++++++++++
.../testsuites/IgniteJobMetricsSelfTestSuite.java | 6 +-
8 files changed, 555 insertions(+), 22 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 0be44c0..bdba983 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -33,6 +33,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.util.GridLogThrottle;
import org.apache.ignite.stream.StreamTransformer;
@@ -291,7 +292,10 @@ public final class IgniteSystemProperties {
* System property to override default job metrics processor property defining
* concurrency level for structure holding job metrics snapshots.
* Default value is {@code 64}.
+ *
+ * @deprecated Use {@link GridMetricManager} instead.
*/
+ @Deprecated
public static final String IGNITE_JOBS_METRICS_CONCURRENCY_LEVEL = "IGNITE_JOBS_METRICS_CONCURRENCY_LEVEL";
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 7cad893..751377f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -69,6 +69,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.LongMetricImpl;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
@@ -102,6 +104,7 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
/**
@@ -112,6 +115,33 @@ public class GridJobProcessor extends GridProcessorAdapter {
/** */
private static final int FINISHED_JOBS_COUNT = Integer.getInteger(IGNITE_JOBS_HISTORY_SIZE, 10240);
+ /** Metrics prefix. */
+ public static final String JOBS = metricName("compute", "jobs");
+
+ /** Started jobs metric name. */
+ public static final String STARTED = "Started";
+
+ /** Active jobs metric name. */
+ public static final String ACTIVE = "Active";
+
+ /** Waiting jobs metric name. */
+ public static final String WAITING = "Waiting";
+
+ /** Canceled jobs metric name. */
+ public static final String CANCELED = "Canceled";
+
+ /** Rejected jobs metric name. */
+ public static final String REJECTED = "Rejected";
+
+ /** Finished jobs metric name. */
+ public static final String FINISHED = "Finished";
+
+ /** Total jobs execution time metric name. */
+ public static final String EXECUTION_TIME = "ExecutionTime";
+
+ /** Total jobs waiting time metric name. */
+ public static final String WAITING_TIME = "WaitingTime";
+
/** */
private final Marshaller marsh;
@@ -156,26 +186,57 @@ public class GridJobProcessor extends GridProcessorAdapter {
private final GridLocalEventListener discoLsnr;
/** Needed for statistics. */
+ @Deprecated
private final LongAdder canceledJobsCnt = new LongAdder();
/** Needed for statistics. */
+ @Deprecated
private final LongAdder finishedJobsCnt = new LongAdder();
/** Needed for statistics. */
+ @Deprecated
private final LongAdder startedJobsCnt = new LongAdder();
/** Needed for statistics. */
+ @Deprecated
private final LongAdder rejectedJobsCnt = new LongAdder();
/** Total job execution time (unaccounted for in metrics). */
+ @Deprecated
private final LongAdder finishedJobsTime = new LongAdder();
/** Maximum job execution time for finished jobs. */
+ @Deprecated
private final GridAtomicLong maxFinishedJobsTime = new GridAtomicLong();
/** */
+ @Deprecated
private final AtomicLong metricsLastUpdateTstamp = new AtomicLong();
+ /** Number of started jobs. */
+ final LongMetricImpl startedJobsMetric;
+
+ /** Number of active jobs currently executing. */
+ final LongMetricImpl activeJobsMetric;
+
+ /** Number of currently queued jobs waiting to be executed. */
+ final LongMetricImpl waitingJobsMetric;
+
+ /** Number of cancelled jobs that are still running. */
+ final LongMetricImpl canceledJobsMetric;
+
+ /** Number of jobs rejected after more recent collision resolution operation. */
+ final LongMetricImpl rejectedJobsMetric;
+
+ /** Number of finished jobs. */
+ final LongMetricImpl finishedJobsMetric;
+
+ /** Total job execution time. */
+ final LongMetricImpl totalExecutionTimeMetric;
+
+ /** Total time jobs spent on waiting queue. */
+ final LongMetricImpl totalWaitTimeMetric;
+
/** */
private boolean stopping;
@@ -183,6 +244,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
private boolean cancelOnStop;
/** */
+ @Deprecated
private final long metricsUpdateFreq;
/** */
@@ -233,6 +295,25 @@ public class GridJobProcessor extends GridProcessorAdapter {
cancelLsnr = new JobCancelListener();
jobExecLsnr = new JobExecutionListener();
discoLsnr = new JobDiscoveryListener();
+
+ MetricRegistry mreg = ctx.metric().registry().withPrefix(JOBS);
+
+ startedJobsMetric = mreg.metric(STARTED, "Number of started jobs.");
+
+ activeJobsMetric = mreg.metric(ACTIVE, "Number of active jobs currently executing.");
+
+ waitingJobsMetric = mreg.metric(WAITING, "Number of currently queued jobs waiting to be executed.");
+
+ canceledJobsMetric = mreg.metric(CANCELED, "Number of cancelled jobs that are still running.");
+
+ rejectedJobsMetric = mreg.metric(REJECTED,
+ "Number of jobs rejected after more recent collision resolution operation.");
+
+ finishedJobsMetric = mreg.metric(FINISHED, "Number of finished jobs.");
+
+ totalExecutionTimeMetric = mreg.metric(EXECUTION_TIME, "Total execution time of jobs.");
+
+ totalWaitTimeMetric = mreg.metric(WAITING_TIME, "Total time jobs spent on waiting queue.");
}
/** {@inheritDoc} */
@@ -262,7 +343,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
@Override public void stop(boolean cancel) {
// Clear collections.
activeJobs.clear();
+
+ activeJobsMetric.reset();
+
cancelledJobs.clear();
+
cancelReqs = new GridBoundedConcurrentLinkedHashMap<>(FINISHED_JOBS_COUNT,
FINISHED_JOBS_COUNT < 128 ? FINISHED_JOBS_COUNT : 128,
0.75f, 16);
@@ -297,7 +382,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
// Rejected jobs.
if (!jobAlwaysActivate) {
for (GridJobWorker job : passiveJobs.values())
- if (passiveJobs.remove(job.getJobId(), job))
+ if (removeFromPassive(job))
rejectJob(job, false);
}
@@ -368,9 +453,12 @@ public class GridJobProcessor extends GridProcessorAdapter {
// We don't increment number of cancelled jobs if it
// was already cancelled.
- if (!job.isInternal() && !isCancelled)
+ if (!job.isInternal() && !isCancelled) {
canceledJobsCnt.increment();
+ canceledJobsMetric.increment();
+ }
+
job.cancel(sysCancel);
}
@@ -657,12 +745,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
private boolean cancelPassiveJob(GridJobWorker job) {
assert !jobAlwaysActivate;
- if (passiveJobs.remove(job.getJobId(), job)) {
+ if (removeFromPassive(job)) {
if (log.isDebugEnabled())
log.debug("Job has been cancelled before activation: " + job);
canceledJobsCnt.increment();
+ canceledJobsMetric.increment();
+
return true;
}
@@ -676,7 +766,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
* @param sys Flag indicating whether this is a system cancel.
*/
private void cancelActiveJob(GridJobWorker job, boolean sys) {
- if (activeJobs.remove(job.getJobId(), job)) {
+ if (removeFromActive(job)) {
cancelledJobs.put(job.getJobId(), job);
if (finishedJobs.contains(job.getJobId()))
@@ -689,6 +779,36 @@ public class GridJobProcessor extends GridProcessorAdapter {
}
/**
+ * @param job Job to remove.
+ * @return {@code True} if job actually removed.
+ */
+ private boolean removeFromActive(GridJobWorker job) {
+ boolean res = activeJobs.remove(job.getJobId(), job);
+
+ if (res)
+ activeJobsMetric.decrement();
+
+ return res;
+ }
+
+ /**
+ * @param job Job to remove.
+ * @return {@code True} if job actually removed.
+ */
+ private boolean removeFromPassive(GridJobWorker job) {
+ boolean res = passiveJobs.remove(job.getJobId(), job);
+
+ if (res) {
+ waitingJobsMetric.decrement();
+
+ if (!jobAlwaysActivate)
+ totalWaitTimeMetric.add(job.getQueuedTime());
+ }
+
+ return res;
+ }
+
+ /**
* Handles collisions.
* <p>
* In most cases this method should be called from main read lock
@@ -866,8 +986,17 @@ public class GridJobProcessor extends GridProcessorAdapter {
}
/**
+ * This method should be removed in Ignite 3.0.
*
+ * @deprecated Metrics calculated via new subsystem.
+ * @see #startedJobsMetric
+ * @see #activeJobsMetric
+ * @see #waitingJobsMetric
+ * @see #canceledJobsMetric
+ * @see #rejectedJobsMetric
+ * @see #finishedJobsMetric
*/
+ @Deprecated
private void updateJobMetrics() {
assert metricsUpdateFreq > 0L;
@@ -880,8 +1009,17 @@ public class GridJobProcessor extends GridProcessorAdapter {
}
/**
+ * This method should be removed in Ignite 3.0.
*
+ * @deprecated Metrics calculated via new subsystem.
+ * @see #startedJobsMetric
+ * @see #activeJobsMetric
+ * @see #waitingJobsMetric
+ * @see #canceledJobsMetric
+ * @see #rejectedJobsMetric
+ * @see #finishedJobsMetric
*/
+ @Deprecated
private void updateJobMetrics0() {
assert metricsUpdateFreq > 0L;
@@ -1136,9 +1274,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
// No sync execution.
job = null;
}
- else if (metricsUpdateFreq > -1L)
- // Job will be executed synchronously.
- startedJobsCnt.increment();
+ else {
+ if (metricsUpdateFreq > -1L)
+ // Job will be executed synchronously.
+ startedJobsCnt.increment();
+
+ startedJobsMetric.increment();
+ }
+
}
else
// Job has been cancelled.
@@ -1148,8 +1291,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
else {
GridJobWorker old = passiveJobs.putIfAbsent(job.getJobId(), job);
- if (old == null)
+ if (old == null) {
+ waitingJobsMetric.increment();
+
handleCollisions();
+ }
else
U.error(log, "Received computation request with duplicate job ID (could be " +
"network malfunction, source node may hang if task timeout was not set) " +
@@ -1237,6 +1383,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
activeJobs.put(jobWorker.getJobId(), jobWorker);
+ activeJobsMetric.increment();
+
// Check if job has been concurrently cancelled.
Boolean sysCancelled = cancelReqs.get(jobWorker.getSession().getId());
@@ -1246,7 +1394,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (sysCancelled != null) {
// Job has been concurrently cancelled.
// Remove from active jobs.
- activeJobs.remove(jobWorker.getJobId(), jobWorker);
+ removeFromActive(jobWorker);
// Even if job has been removed from another thread, we need to reject it
// here since job has never been executed.
@@ -1263,7 +1411,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
// However we need to check if master is alive before job will get
// its runner thread for proper master leave handling.
if (ctx.discovery().node(jobWorker.getTaskNode().id()) == null &&
- activeJobs.remove(jobWorker.getJobId(), jobWorker)) {
+ removeFromActive(jobWorker)) {
// Add to cancelled jobs.
cancelledJobs.put(jobWorker.getJobId(), jobWorker);
@@ -1303,11 +1451,13 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (metricsUpdateFreq > -1L)
startedJobsCnt.increment();
+ startedJobsMetric.increment();
+
return true;
}
catch (RejectedExecutionException e) {
// Remove from active jobs.
- activeJobs.remove(jobWorker.getJobId(), jobWorker);
+ removeFromActive(jobWorker);
// Even if job was removed from another thread, we need to reject it
// here since job has never been executed.
@@ -1317,6 +1467,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (metricsUpdateFreq > -1L)
rejectedJobsCnt.increment();
+ rejectedJobsMetric.increment();
+
jobWorker.finishJob(null, e2, true);
}
@@ -1645,7 +1797,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
@Override public boolean activate() {
GridJobWorker jobWorker = getJobWorker();
- return passiveJobs.remove(jobWorker.getJobId(), jobWorker) &&
+ return removeFromPassive(jobWorker) &&
onBeforeActivateJob(jobWorker) &&
executeAsync(jobWorker);
}
@@ -1660,17 +1812,19 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (passive) {
// If waiting job being rejected.
- if (passiveJobs.remove(jobWorker.getJobId(), jobWorker)) {
+ if (removeFromPassive(jobWorker)) {
rejectJob(jobWorker, true);
if (metricsUpdateFreq > -1L)
rejectedJobsCnt.increment();
+ rejectedJobsMetric.increment();
+
ret = true;
}
}
// If active job being cancelled.
- else if (activeJobs.remove(jobWorker.getJobId(), jobWorker)) {
+ else if (removeFromActive(jobWorker)) {
cancelledJobs.put(jobWorker.getJobId(), jobWorker);
if (finishedJobs.contains(jobWorker.getJobId()))
@@ -1784,19 +1938,23 @@ public class GridJobProcessor extends GridProcessorAdapter {
// reset once this job will be accounted for in metrics.
finishedJobsCnt.increment();
+ finishedJobsMetric.increment();
+
// Increment job execution time. This counter gets
// reset once this job will be accounted for in metrics.
long execTime = worker.getExecuteTime();
finishedJobsTime.add(execTime);
+ totalExecutionTimeMetric.add(execTime);
+
maxFinishedJobsTime.setIfGreater(execTime);
if (jobAlwaysActivate) {
if (metricsUpdateFreq > -1L)
updateJobMetrics();
- if (!activeJobs.remove(worker.getJobId(), worker))
+ if (!removeFromActive(worker))
cancelledJobs.remove(worker.getJobId(), worker);
heldJobs.remove(worker.getJobId());
@@ -1809,7 +1967,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
return;
}
- if (!activeJobs.remove(worker.getJobId(), worker))
+ if (!removeFromActive(worker))
cancelledJobs.remove(worker.getJobId(), worker);
heldJobs.remove(worker.getJobId());
@@ -1951,7 +2109,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (!jobAlwaysActivate) {
for (GridJobWorker job : passiveJobs.values()) {
if (job.getTaskNode().id().equals(nodeId)) {
- if (passiveJobs.remove(job.getJobId(), job))
+ if (removeFromPassive(job))
U.warn(log, "Task node left grid (job will not be activated) " +
"[nodeId=" + nodeId + ", jobSes=" + job.getSession() + ", job=" + job + ']');
}
@@ -1960,7 +2118,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
for (GridJobWorker job : activeJobs.values()) {
if (job.getTaskNode().id().equals(nodeId) && !job.isFinishing() &&
- activeJobs.remove(job.getJobId(), job)) {
+ removeFromActive(job)) {
// Add to cancelled jobs.
cancelledJobs.put(job.getJobId(), job);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetrics.java
index 1700ad9..ac8df4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetrics.java
@@ -17,11 +17,15 @@
package org.apache.ignite.internal.processors.jobmetrics;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Job metrics.
+ *
+ * @deprecated Use {@link GridMetricManager} instead.
*/
+@Deprecated
public class GridJobMetrics {
/** */
private int maxActiveJobs;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java
index 9f0ef4b..1ef6d7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteReducer;
@@ -30,7 +31,10 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_JOBS_METRICS_CONCU
/**
* Processes job metrics.
+ *
+ * @deprecated Use {@link GridMetricManager} instead.
*/
+@Deprecated
public class GridJobMetricsProcessor extends GridProcessorAdapter {
/** */
private static final int CONCURRENCY_LEVEL = Integer.getInteger(IGNITE_JOBS_METRICS_CONCURRENCY_LEVEL, 64);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSnapshot.java
index 7c27ece..3747ffa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSnapshot.java
@@ -17,12 +17,16 @@
package org.apache.ignite.internal.processors.jobmetrics;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Job metrics snapshot.
+ *
+ * @deprecated Use {@link GridMetricManager} instead.
*/
+@Deprecated
public class GridJobMetricsSnapshot {
/** */
private final long ts = U.currentTimeMillis();
@@ -225,4 +229,4 @@ public class GridJobMetricsSnapshot {
@Override public String toString() {
return S.toString(GridJobMetricsSnapshot.class, this);
}
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/package-info.java
index 6811afd..be94e73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/package-info.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/package-info.java
@@ -17,6 +17,6 @@
/**
* <!-- Package description. -->
- * TODO.
+ * Job metrics.
*/
-package org.apache.ignite.internal.processors.jobmetrics;
\ No newline at end of file
+package org.apache.ignite.internal.processors.jobmetrics;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSelfTest.java
new file mode 100644
index 0000000..23a65d5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSelfTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.jobmetrics;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.CollisionExternalListener;
+import org.apache.ignite.spi.collision.CollisionJobContext;
+import org.apache.ignite.spi.collision.CollisionSpi;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.ACTIVE;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.CANCELED;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.EXECUTION_TIME;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.FINISHED;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.JOBS;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.REJECTED;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.STARTED;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.WAITING;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.WAITING_TIME;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Grid job metrics processor load test.
+ */
+public class GridJobMetricsSelfTest extends GridCommonAbstractTest {
+ /** */
+ public static final long TIMEOUT = 10_000;
+
+ /** */
+ private static volatile CountDownLatch latch;
+
+ /** Test correct calculation of rejected and waiting metrics of the {@link GridJobProcessor}. */
+ @Test
+ public void testGridJobWaitingRejectedMetrics() throws Exception {
+ latch = new CountDownLatch(1);
+
+ GridTestCollision collisioinSpi = new GridTestCollision();
+
+ IgniteConfiguration cfg = getConfiguration()
+ .setCollisionSpi(collisioinSpi);
+
+ try (IgniteEx g = startGrid(cfg)) {
+ ReadOnlyMetricRegistry mreg = g.context().metric().registry().withPrefix(JOBS);
+
+ LongMetric started = (LongMetric)mreg.findMetric(STARTED);
+ LongMetric active = (LongMetric)mreg.findMetric(ACTIVE);
+ LongMetric waiting = (LongMetric)mreg.findMetric(WAITING);
+ LongMetric canceled = (LongMetric)mreg.findMetric(CANCELED);
+ LongMetric rejected = (LongMetric)mreg.findMetric(REJECTED);
+ LongMetric finished = (LongMetric)mreg.findMetric(FINISHED);
+ LongMetric totalExecutionTime = (LongMetric)mreg.findMetric(EXECUTION_TIME);
+ LongMetric totalWaitingTime = (LongMetric)mreg.findMetric(WAITING_TIME);
+
+ assertNotNull(started);
+ assertNotNull(active);
+ assertNotNull(waiting);
+ assertNotNull(canceled);
+ assertNotNull(rejected);
+ assertNotNull(finished);
+ assertNotNull(totalExecutionTime);
+ assertNotNull(totalWaitingTime);
+
+ assertEquals(0, started.value());
+ assertEquals(0, active.value());
+ assertEquals(0, waiting.value());
+ assertEquals(0, canceled.value());
+ assertEquals(0, rejected.value());
+ assertEquals(0, finished.value());
+ assertEquals(0, totalExecutionTime.value());
+ assertEquals(0, totalWaitingTime.value());
+
+ SimplestTask task1 = new SimplestTask();
+ SimplestTask task2 = new SimplestTask();
+ SimplestTask task3 = new SimplestTask();
+
+ task1.block = true;
+ task2.block = true;
+ task3.block = true;
+
+ // Task will become "waiting", because of CollisionSpi implementation.
+ ComputeTaskFuture<?> fut1 = g.compute().executeAsync(task1, 1);
+ ComputeTaskFuture<?> fut2 = g.compute().executeAsync(task2, 1);
+ ComputeTaskFuture<?> fut3 = g.compute().executeAsync(task3, 1);
+
+ assertEquals(0, started.value());
+ assertEquals(0, active.value());
+ assertEquals(3, waiting.value());
+ assertEquals(0, canceled.value());
+ assertEquals(0, rejected.value());
+ assertEquals(0, finished.value());
+
+ // Activating 2 of 3 jobs. Rejecting 1 of them.
+ Iterator<CollisionJobContext> iter = collisioinSpi.jobs.values().iterator();
+
+ iter.next().cancel();
+
+ assertEquals(1, rejected.value());
+
+ Thread.sleep(100); // Sleeping to make sure totalWaitingTime will become more the zero.
+
+ iter.next().activate();
+ iter.next().activate();
+
+ boolean res = waitForCondition(() -> active.value() > 0, TIMEOUT);
+
+ assertTrue(res);
+ assertTrue("Waiting time should be greater then zero.", totalWaitingTime.value() > 0);
+
+ Thread.sleep(100); // Sleeping to make sure totalExecutionTime will become more the zero.
+
+ latch.countDown();
+
+ res = waitForCondition(() -> fut1.isDone() && fut2.isDone() && fut3.isDone(), TIMEOUT);
+
+ assertTrue(res);
+
+ res = waitForCondition(() -> finished.value() == 3, TIMEOUT);
+
+ assertTrue(res);
+
+ assertTrue("Execution time should be greater then zero.", totalExecutionTime.value() > 0);
+ }
+ }
+
+ /** Test correct calculation of finished, started, active, canceled metrics of the {@link GridJobProcessor}. */
+ @Test
+ public void testGridJobMetrics() throws Exception {
+ latch = new CountDownLatch(1);
+
+ try(IgniteEx g = startGrid(0)) {
+ ReadOnlyMetricRegistry mreg = g.context().metric().registry().withPrefix(JOBS);
+
+ LongMetric started = (LongMetric)mreg.findMetric(STARTED);
+ LongMetric active = (LongMetric)mreg.findMetric(ACTIVE);
+ LongMetric waiting = (LongMetric)mreg.findMetric(WAITING);
+ LongMetric canceled = (LongMetric)mreg.findMetric(CANCELED);
+ LongMetric rejected = (LongMetric)mreg.findMetric(REJECTED);
+ LongMetric finished = (LongMetric)mreg.findMetric(FINISHED);
+ LongMetric totalExecutionTime = (LongMetric)mreg.findMetric(EXECUTION_TIME);
+ LongMetric totalWaitingTime = (LongMetric)mreg.findMetric(WAITING_TIME);
+
+ assertNotNull(started);
+ assertNotNull(active);
+ assertNotNull(waiting);
+ assertNotNull(canceled);
+ assertNotNull(rejected);
+ assertNotNull(finished);
+ assertNotNull(totalExecutionTime);
+ assertNotNull(totalWaitingTime);
+
+ assertEquals(0, started.value());
+ assertEquals(0, active.value());
+ assertEquals(0, waiting.value());
+ assertEquals(0, canceled.value());
+ assertEquals(0, rejected.value());
+ assertEquals(0, finished.value());
+ assertEquals(0, totalExecutionTime.value());
+ assertEquals(0, totalWaitingTime.value());
+
+ SimplestTask task = new SimplestTask();
+
+ g.compute().execute(task, 1);
+
+ // Waiting task to finish.
+ boolean res = waitForCondition(() -> active.value() == 0, TIMEOUT);
+
+ assertTrue("Active = " + active.value(), res);
+
+ assertEquals(1, started.value());
+ assertEquals(0, waiting.value());
+ assertEquals(0, canceled.value());
+ assertEquals(0, rejected.value());
+ assertEquals(1, finished.value());
+
+ // Task should block until latch is down.
+ task.block = true;
+
+ ComputeTaskFuture<?> fut = g.compute().executeAsync(task, 1);
+
+ // Waiting task to start execution.
+ res = waitForCondition(() -> active.value() == 1, TIMEOUT);
+
+ assertTrue("Active = " + active.value(), res);
+
+ assertEquals(2, started.value());
+ assertEquals(0, waiting.value());
+ assertEquals(0, canceled.value());
+ assertEquals(0, rejected.value());
+ assertEquals(1, finished.value());
+
+ Thread.sleep(100); // Sleeping to make sure totalExecutionTime will become more the zero.
+
+ // After latch is down, task should finish.
+ latch.countDown();
+
+ fut.get(TIMEOUT);
+
+ res = waitForCondition(() -> active.value() == 0, TIMEOUT);
+
+ assertTrue("Active = " + active.value(), res);
+ assertTrue("Execution time should be greater then zero.", totalExecutionTime.value() > 0);
+
+ assertEquals(2, finished.value());
+
+ latch = new CountDownLatch(1);
+
+ fut = g.compute().executeAsync(task, 1);
+
+ res = waitForCondition(() -> active.value() == 1, TIMEOUT);
+
+ assertTrue("Active = " + active.value(), res);
+
+ assertEquals(3, started.value());
+ assertEquals(0, waiting.value());
+ assertEquals(0, canceled.value());
+ assertEquals(0, rejected.value());
+ assertEquals(2, finished.value());
+
+ // First cancel task, then allow it to finish.
+ fut.cancel();
+
+ latch.countDown();
+
+ res = waitForCondition(() -> active.value() == 0, TIMEOUT);
+
+ assertTrue("Active = " + active.value(), res);
+
+ assertEquals(3, started.value());
+ assertEquals(0, waiting.value());
+ assertEquals(1, canceled.value());
+ assertEquals(0, rejected.value());
+
+ res = waitForCondition(() -> finished.value() == 3, TIMEOUT);
+
+ assertTrue("Finished = " + finished.value(), res);
+ }
+ }
+
+ /** */
+ private static class SimplestJob implements ComputeJob {
+ /** */
+ private final boolean block;
+
+ /** */
+ public SimplestJob(boolean block) {
+ this.block = block;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ if (block) {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ return "1";
+ }
+ }
+
+ /** */
+ private static class SimplestTask extends ComputeTaskAdapter<Object, Object> {
+ /** */
+ volatile boolean block;
+
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) throws IgniteException {
+ Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+
+ for (ClusterNode node : subgrid)
+ jobs.put(new SimplestJob(block), node);
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
+ return "1";
+ }
+ }
+
+ /** */
+ @IgniteSpiMultipleInstancesSupport(true)
+ public static class GridTestCollision extends IgniteSpiAdapter implements CollisionSpi {
+ /** */
+ HashMap<ComputeJob, CollisionJobContext> jobs = new HashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void onCollision(CollisionContext ctx) {
+ for (CollisionJobContext jobCtx : ctx.waitingJobs())
+ jobs.put(jobCtx.getJob(), jobCtx);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStart(String igniteInstanceName) throws IgniteSpiException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStop() throws IgniteSpiException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setExternalCollisionListener(CollisionExternalListener lsnr) {
+ // No-op.
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteJobMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteJobMetricsSelfTestSuite.java
index 818f0de..fe9a2fe 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteJobMetricsSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteJobMetricsSelfTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessorLoadTest;
+import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSelfTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -25,6 +26,9 @@ import org.junit.runners.Suite;
* Job metrics self test suite.
*/
@RunWith(Suite.class)
-@Suite.SuiteClasses({GridJobMetricsProcessorLoadTest.class})
+@Suite.SuiteClasses({
+ GridJobMetricsProcessorLoadTest.class,
+ GridJobMetricsSelfTest.class,
+})
public class IgniteJobMetricsSelfTestSuite {
}