You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2019/06/26 06:35:45 UTC

[ignite] branch master updated: IGNITE-11926: GridJobProcessor metrics migration. (#6622)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a746d1  IGNITE-11926: GridJobProcessor metrics migration. (#6622)
9a746d1 is described below

commit 9a746d15eba6be4e4f53fe1bf1ea05972c396560
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Jun 26 09:35:33 2019 +0300

    IGNITE-11926: GridJobProcessor metrics migration. (#6622)
    
    GridJobProcessor metrics migration to the new metrics subsystem.
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   4 +
 .../internal/processors/job/GridJobProcessor.java  | 194 +++++++++--
 .../processors/jobmetrics/GridJobMetrics.java      |   4 +
 .../jobmetrics/GridJobMetricsProcessor.java        |   4 +
 .../jobmetrics/GridJobMetricsSnapshot.java         |   6 +-
 .../processors/jobmetrics/package-info.java        |   4 +-
 .../jobmetrics/GridJobMetricsSelfTest.java         | 355 +++++++++++++++++++++
 .../testsuites/IgniteJobMetricsSelfTestSuite.java  |   6 +-
 8 files changed, 555 insertions(+), 22 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 0be44c0..bdba983 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -33,6 +33,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.client.GridClient;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.processors.rest.GridRestCommand;
 import org.apache.ignite.internal.util.GridLogThrottle;
 import org.apache.ignite.stream.StreamTransformer;
@@ -291,7 +292,10 @@ public final class IgniteSystemProperties {
      * System property to override default job metrics processor property defining
      * concurrency level for structure holding job metrics snapshots.
      * Default value is {@code 64}.
+     *
+     * @deprecated Use {@link GridMetricManager} instead.
      */
+    @Deprecated
     public static final String IGNITE_JOBS_METRICS_CONCURRENCY_LEVEL = "IGNITE_JOBS_METRICS_CONCURRENCY_LEVEL";
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 7cad893..751377f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -69,6 +69,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.LongMetricImpl;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
@@ -102,6 +104,7 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
 
 /**
@@ -112,6 +115,33 @@ public class GridJobProcessor extends GridProcessorAdapter {
     /** */
     private static final int FINISHED_JOBS_COUNT = Integer.getInteger(IGNITE_JOBS_HISTORY_SIZE, 10240);
 
+    /** Metrics prefix. */
+    public static final String JOBS = metricName("compute", "jobs");
+
+    /** Started jobs metric name. */
+    public static final String STARTED = "Started";
+
+    /** Active jobs metric name. */
+    public static final String ACTIVE = "Active";
+
+    /** Waiting jobs metric name. */
+    public static final String WAITING = "Waiting";
+
+    /** Canceled jobs metric name. */
+    public static final String CANCELED = "Canceled";
+
+    /** Rejected jobs metric name. */
+    public static final String REJECTED = "Rejected";
+
+    /** Finished jobs metric name. */
+    public static final String FINISHED = "Finished";
+
+    /** Total jobs execution time metric name. */
+    public static final String EXECUTION_TIME = "ExecutionTime";
+
+    /** Total jobs waiting time metric name. */
+    public static final String WAITING_TIME = "WaitingTime";
+
     /** */
     private final Marshaller marsh;
 
@@ -156,26 +186,57 @@ public class GridJobProcessor extends GridProcessorAdapter {
     private final GridLocalEventListener discoLsnr;
 
     /** Needed for statistics. */
+    @Deprecated
     private final LongAdder canceledJobsCnt = new LongAdder();
 
     /** Needed for statistics. */
+    @Deprecated
     private final LongAdder finishedJobsCnt = new LongAdder();
 
     /** Needed for statistics. */
+    @Deprecated
     private final LongAdder startedJobsCnt = new LongAdder();
 
     /** Needed for statistics. */
+    @Deprecated
     private final LongAdder rejectedJobsCnt = new LongAdder();
 
     /** Total job execution time (unaccounted for in metrics). */
+    @Deprecated
     private final LongAdder finishedJobsTime = new LongAdder();
 
     /** Maximum job execution time for finished jobs. */
+    @Deprecated
     private final GridAtomicLong maxFinishedJobsTime = new GridAtomicLong();
 
     /** */
+    @Deprecated
     private final AtomicLong metricsLastUpdateTstamp = new AtomicLong();
 
+    /** Number of started jobs. */
+    final LongMetricImpl startedJobsMetric;
+
+    /** Number of active jobs currently executing. */
+    final LongMetricImpl activeJobsMetric;
+
+    /** Number of currently queued jobs waiting to be executed. */
+    final LongMetricImpl waitingJobsMetric;
+
+    /** Number of cancelled jobs that are still running. */
+    final LongMetricImpl canceledJobsMetric;
+
+    /** Number of jobs rejected after more recent collision resolution operation. */
+    final LongMetricImpl rejectedJobsMetric;
+
+    /** Number of finished jobs. */
+    final LongMetricImpl finishedJobsMetric;
+
+    /** Total job execution time. */
+    final LongMetricImpl totalExecutionTimeMetric;
+
+    /** Total time jobs spent on waiting queue. */
+    final LongMetricImpl totalWaitTimeMetric;
+
     /** */
     private boolean stopping;
 
@@ -183,6 +244,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
     private boolean cancelOnStop;
 
     /** */
+    @Deprecated
     private final long metricsUpdateFreq;
 
     /** */
@@ -233,6 +295,25 @@ public class GridJobProcessor extends GridProcessorAdapter {
         cancelLsnr = new JobCancelListener();
         jobExecLsnr = new JobExecutionListener();
         discoLsnr = new JobDiscoveryListener();
+
+        MetricRegistry mreg = ctx.metric().registry().withPrefix(JOBS);
+
+        startedJobsMetric = mreg.metric(STARTED, "Number of started jobs.");
+
+        activeJobsMetric = mreg.metric(ACTIVE, "Number of active jobs currently executing.");
+
+        waitingJobsMetric = mreg.metric(WAITING, "Number of currently queued jobs waiting to be executed.");
+
+        canceledJobsMetric = mreg.metric(CANCELED, "Number of cancelled jobs that are still running.");
+
+        rejectedJobsMetric = mreg.metric(REJECTED,
+            "Number of jobs rejected after more recent collision resolution operation.");
+
+        finishedJobsMetric = mreg.metric(FINISHED, "Number of finished jobs.");
+
+        totalExecutionTimeMetric = mreg.metric(EXECUTION_TIME, "Total execution time of jobs.");
+
+        totalWaitTimeMetric = mreg.metric(WAITING_TIME, "Total time jobs spent on waiting queue.");
     }
 
     /** {@inheritDoc} */
@@ -262,7 +343,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
     @Override public void stop(boolean cancel) {
         // Clear collections.
         activeJobs.clear();
+
+        activeJobsMetric.reset();
+
         cancelledJobs.clear();
+
         cancelReqs = new GridBoundedConcurrentLinkedHashMap<>(FINISHED_JOBS_COUNT,
             FINISHED_JOBS_COUNT < 128 ? FINISHED_JOBS_COUNT : 128,
             0.75f, 16);
@@ -297,7 +382,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         // Rejected jobs.
         if (!jobAlwaysActivate) {
             for (GridJobWorker job : passiveJobs.values())
-                if (passiveJobs.remove(job.getJobId(), job))
+                if (removeFromPassive(job))
                     rejectJob(job, false);
         }
 
@@ -368,9 +453,12 @@ public class GridJobProcessor extends GridProcessorAdapter {
 
         // We don't increment number of cancelled jobs if it
         // was already cancelled.
-        if (!job.isInternal() && !isCancelled)
+        if (!job.isInternal() && !isCancelled) {
             canceledJobsCnt.increment();
 
+            canceledJobsMetric.increment();
+        }
+
         job.cancel(sysCancel);
     }
 
@@ -657,12 +745,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
     private boolean cancelPassiveJob(GridJobWorker job) {
         assert !jobAlwaysActivate;
 
-        if (passiveJobs.remove(job.getJobId(), job)) {
+        if (removeFromPassive(job)) {
             if (log.isDebugEnabled())
                 log.debug("Job has been cancelled before activation: " + job);
 
             canceledJobsCnt.increment();
 
+            canceledJobsMetric.increment();
+
             return true;
         }
 
@@ -676,7 +766,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      * @param sys Flag indicating whether this is a system cancel.
      */
     private void cancelActiveJob(GridJobWorker job, boolean sys) {
-        if (activeJobs.remove(job.getJobId(), job)) {
+        if (removeFromActive(job)) {
             cancelledJobs.put(job.getJobId(), job);
 
             if (finishedJobs.contains(job.getJobId()))
@@ -689,6 +779,36 @@ public class GridJobProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param job Job to remove.
+     * @return {@code True} if job actually removed.
+     */
+    private boolean removeFromActive(GridJobWorker job) {
+        boolean res = activeJobs.remove(job.getJobId(), job);
+
+        if (res)
+            activeJobsMetric.decrement();
+
+        return res;
+    }
+
+    /**
+     * @param job Job to remove.
+     * @return {@code True} if job actually removed.
+     */
+    private boolean removeFromPassive(GridJobWorker job) {
+        boolean res = passiveJobs.remove(job.getJobId(), job);
+
+        if (res) {
+            waitingJobsMetric.decrement();
+
+            if (!jobAlwaysActivate)
+                totalWaitTimeMetric.add(job.getQueuedTime());
+        }
+
+        return res;
+    }
+
+    /**
      * Handles collisions.
      * <p>
      * In most cases this method should be called from main read lock
@@ -866,8 +986,17 @@ public class GridJobProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * This method should be removed in Ignite 3.0.
      *
+     * @deprecated Metrics calculated via new subsystem.
+     * @see #startedJobsMetric
+     * @see #activeJobsMetric
+     * @see #waitingJobsMetric
+     * @see #canceledJobsMetric
+     * @see #rejectedJobsMetric
+     * @see #finishedJobsMetric
      */
+    @Deprecated
     private void updateJobMetrics() {
         assert metricsUpdateFreq > 0L;
 
@@ -880,8 +1009,17 @@ public class GridJobProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * This method should be removed in Ignite 3.0.
      *
+     * @deprecated Metrics calculated via new subsystem.
+     * @see #startedJobsMetric
+     * @see #activeJobsMetric
+     * @see #waitingJobsMetric
+     * @see #canceledJobsMetric
+     * @see #rejectedJobsMetric
+     * @see #finishedJobsMetric
      */
+    @Deprecated
     private void updateJobMetrics0() {
         assert metricsUpdateFreq > 0L;
 
@@ -1136,9 +1274,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
                                     // No sync execution.
                                     job = null;
                                 }
-                                else if (metricsUpdateFreq > -1L)
-                                    // Job will be executed synchronously.
-                                    startedJobsCnt.increment();
+                                else {
+                                    if (metricsUpdateFreq > -1L)
+                                        // Job will be executed synchronously.
+                                        startedJobsCnt.increment();
+
+                                    startedJobsMetric.increment();
+                                }
+
                             }
                             else
                                 // Job has been cancelled.
@@ -1148,8 +1291,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
                         else {
                             GridJobWorker old = passiveJobs.putIfAbsent(job.getJobId(), job);
 
-                            if (old == null)
+                            if (old == null) {
+                                waitingJobsMetric.increment();
+
                                 handleCollisions();
+                            }
                             else
                                 U.error(log, "Received computation request with duplicate job ID (could be " +
                                     "network malfunction, source node may hang if task timeout was not set) " +
@@ -1237,6 +1383,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
 
         activeJobs.put(jobWorker.getJobId(), jobWorker);
 
+        activeJobsMetric.increment();
+
         // Check if job has been concurrently cancelled.
         Boolean sysCancelled = cancelReqs.get(jobWorker.getSession().getId());
 
@@ -1246,7 +1394,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         if (sysCancelled != null) {
             // Job has been concurrently cancelled.
             // Remove from active jobs.
-            activeJobs.remove(jobWorker.getJobId(), jobWorker);
+            removeFromActive(jobWorker);
 
             // Even if job has been removed from another thread, we need to reject it
             // here since job has never been executed.
@@ -1263,7 +1411,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         // However we need to check if master is alive before job will get
         // its runner thread for proper master leave handling.
         if (ctx.discovery().node(jobWorker.getTaskNode().id()) == null &&
-            activeJobs.remove(jobWorker.getJobId(), jobWorker)) {
+            removeFromActive(jobWorker)) {
             // Add to cancelled jobs.
             cancelledJobs.put(jobWorker.getJobId(), jobWorker);
 
@@ -1303,11 +1451,13 @@ public class GridJobProcessor extends GridProcessorAdapter {
             if (metricsUpdateFreq > -1L)
                 startedJobsCnt.increment();
 
+            startedJobsMetric.increment();
+
             return true;
         }
         catch (RejectedExecutionException e) {
             // Remove from active jobs.
-            activeJobs.remove(jobWorker.getJobId(), jobWorker);
+            removeFromActive(jobWorker);
 
             // Even if job was removed from another thread, we need to reject it
             // here since job has never been executed.
@@ -1317,6 +1467,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
             if (metricsUpdateFreq > -1L)
                 rejectedJobsCnt.increment();
 
+            rejectedJobsMetric.increment();
+
             jobWorker.finishJob(null, e2, true);
         }
 
@@ -1645,7 +1797,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         @Override public boolean activate() {
             GridJobWorker jobWorker = getJobWorker();
 
-            return passiveJobs.remove(jobWorker.getJobId(), jobWorker) &&
+            return removeFromPassive(jobWorker) &&
                 onBeforeActivateJob(jobWorker) &&
                 executeAsync(jobWorker);
         }
@@ -1660,17 +1812,19 @@ public class GridJobProcessor extends GridProcessorAdapter {
 
             if (passive) {
                 // If waiting job being rejected.
-                if (passiveJobs.remove(jobWorker.getJobId(), jobWorker)) {
+                if (removeFromPassive(jobWorker)) {
                     rejectJob(jobWorker, true);
 
                     if (metricsUpdateFreq > -1L)
                         rejectedJobsCnt.increment();
 
+                    rejectedJobsMetric.increment();
+
                     ret = true;
                 }
             }
             // If active job being cancelled.
-            else if (activeJobs.remove(jobWorker.getJobId(), jobWorker)) {
+            else if (removeFromActive(jobWorker)) {
                 cancelledJobs.put(jobWorker.getJobId(), jobWorker);
 
                 if (finishedJobs.contains(jobWorker.getJobId()))
@@ -1784,19 +1938,23 @@ public class GridJobProcessor extends GridProcessorAdapter {
                 // reset once this job will be accounted for in metrics.
                 finishedJobsCnt.increment();
 
+                finishedJobsMetric.increment();
+
                 // Increment job execution time. This counter gets
                 // reset once this job will be accounted for in metrics.
                 long execTime = worker.getExecuteTime();
 
                 finishedJobsTime.add(execTime);
 
+                totalExecutionTimeMetric.add(execTime);
+
                 maxFinishedJobsTime.setIfGreater(execTime);
 
                 if (jobAlwaysActivate) {
                     if (metricsUpdateFreq > -1L)
                         updateJobMetrics();
 
-                    if (!activeJobs.remove(worker.getJobId(), worker))
+                    if (!removeFromActive(worker))
                         cancelledJobs.remove(worker.getJobId(), worker);
 
                     heldJobs.remove(worker.getJobId());
@@ -1809,7 +1967,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                         return;
                     }
 
-                    if (!activeJobs.remove(worker.getJobId(), worker))
+                    if (!removeFromActive(worker))
                         cancelledJobs.remove(worker.getJobId(), worker);
 
                     heldJobs.remove(worker.getJobId());
@@ -1951,7 +2109,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                     if (!jobAlwaysActivate) {
                         for (GridJobWorker job : passiveJobs.values()) {
                             if (job.getTaskNode().id().equals(nodeId)) {
-                                if (passiveJobs.remove(job.getJobId(), job))
+                                if (removeFromPassive(job))
                                     U.warn(log, "Task node left grid (job will not be activated) " +
                                         "[nodeId=" + nodeId + ", jobSes=" + job.getSession() + ", job=" + job + ']');
                             }
@@ -1960,7 +2118,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
 
                     for (GridJobWorker job : activeJobs.values()) {
                         if (job.getTaskNode().id().equals(nodeId) && !job.isFinishing() &&
-                            activeJobs.remove(job.getJobId(), job)) {
+                            removeFromActive(job)) {
                             // Add to cancelled jobs.
                             cancelledJobs.put(job.getJobId(), job);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetrics.java
index 1700ad9..ac8df4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetrics.java
@@ -17,11 +17,15 @@
 
 package org.apache.ignite.internal.processors.jobmetrics;
 
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Job metrics.
+ *
+ * @deprecated Use {@link GridMetricManager} instead.
  */
+@Deprecated
 public class GridJobMetrics {
     /** */
     private int maxActiveJobs;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java
index 9f0ef4b..1ef6d7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsProcessor.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteReducer;
@@ -30,7 +31,10 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_JOBS_METRICS_CONCU
 
 /**
  * Processes job metrics.
+ *
+ * @deprecated Use {@link GridMetricManager} instead.
  */
+@Deprecated
 public class GridJobMetricsProcessor extends GridProcessorAdapter {
     /** */
     private static final int CONCURRENCY_LEVEL = Integer.getInteger(IGNITE_JOBS_METRICS_CONCURRENCY_LEVEL, 64);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSnapshot.java
index 7c27ece..3747ffa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSnapshot.java
@@ -17,12 +17,16 @@
 
 package org.apache.ignite.internal.processors.jobmetrics;
 
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Job metrics snapshot.
+ *
+ * @deprecated Use {@link GridMetricManager} instead.
  */
+@Deprecated
 public class GridJobMetricsSnapshot {
     /** */
     private final long ts = U.currentTimeMillis();
@@ -225,4 +229,4 @@ public class GridJobMetricsSnapshot {
     @Override public String toString() {
         return S.toString(GridJobMetricsSnapshot.class, this);
     }
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/package-info.java
index 6811afd..be94e73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/package-info.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/jobmetrics/package-info.java
@@ -17,6 +17,6 @@
 
 /**
  * <!-- Package description. -->
- * TODO.
+ * Job metrics.
  */
-package org.apache.ignite.internal.processors.jobmetrics;
\ No newline at end of file
+package org.apache.ignite.internal.processors.jobmetrics;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSelfTest.java
new file mode 100644
index 0000000..23a65d5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/jobmetrics/GridJobMetricsSelfTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.jobmetrics;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.CollisionExternalListener;
+import org.apache.ignite.spi.collision.CollisionJobContext;
+import org.apache.ignite.spi.collision.CollisionSpi;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.ACTIVE;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.CANCELED;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.EXECUTION_TIME;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.FINISHED;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.JOBS;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.REJECTED;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.STARTED;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.WAITING;
+import static org.apache.ignite.internal.processors.job.GridJobProcessor.WAITING_TIME;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Grid job metrics processor load test.
+ */
+public class GridJobMetricsSelfTest extends GridCommonAbstractTest {
+    /** */
+    public static final long TIMEOUT = 10_000;
+
+    /** */
+    private static volatile CountDownLatch latch;
+
+    /** Test correct calculation of rejected and waiting metrics of the {@link GridJobProcessor}. */
+    @Test
+    public void testGridJobWaitingRejectedMetrics() throws Exception {
+        latch = new CountDownLatch(1);
+
+        GridTestCollision collisioinSpi = new GridTestCollision();
+
+        IgniteConfiguration cfg = getConfiguration()
+            .setCollisionSpi(collisioinSpi);
+
+        try (IgniteEx g = startGrid(cfg)) {
+            ReadOnlyMetricRegistry mreg = g.context().metric().registry().withPrefix(JOBS);
+
+            LongMetric started = (LongMetric)mreg.findMetric(STARTED);
+            LongMetric active = (LongMetric)mreg.findMetric(ACTIVE);
+            LongMetric waiting = (LongMetric)mreg.findMetric(WAITING);
+            LongMetric canceled = (LongMetric)mreg.findMetric(CANCELED);
+            LongMetric rejected = (LongMetric)mreg.findMetric(REJECTED);
+            LongMetric finished = (LongMetric)mreg.findMetric(FINISHED);
+            LongMetric totalExecutionTime = (LongMetric)mreg.findMetric(EXECUTION_TIME);
+            LongMetric totalWaitingTime = (LongMetric)mreg.findMetric(WAITING_TIME);
+
+            assertNotNull(started);
+            assertNotNull(active);
+            assertNotNull(waiting);
+            assertNotNull(canceled);
+            assertNotNull(rejected);
+            assertNotNull(finished);
+            assertNotNull(totalExecutionTime);
+            assertNotNull(totalWaitingTime);
+
+            assertEquals(0, started.value());
+            assertEquals(0, active.value());
+            assertEquals(0, waiting.value());
+            assertEquals(0, canceled.value());
+            assertEquals(0, rejected.value());
+            assertEquals(0, finished.value());
+            assertEquals(0, totalExecutionTime.value());
+            assertEquals(0, totalWaitingTime.value());
+
+            SimplestTask task1 = new SimplestTask();
+            SimplestTask task2 = new SimplestTask();
+            SimplestTask task3 = new SimplestTask();
+
+            task1.block = true;
+            task2.block = true;
+            task3.block = true;
+
+            // Task will become "waiting", because of CollisionSpi implementation.
+            ComputeTaskFuture<?> fut1 = g.compute().executeAsync(task1, 1);
+            ComputeTaskFuture<?> fut2 = g.compute().executeAsync(task2, 1);
+            ComputeTaskFuture<?> fut3 = g.compute().executeAsync(task3, 1);
+
+            assertEquals(0, started.value());
+            assertEquals(0, active.value());
+            assertEquals(3, waiting.value());
+            assertEquals(0, canceled.value());
+            assertEquals(0, rejected.value());
+            assertEquals(0, finished.value());
+
+            // Activating 2 of 3 jobs. Rejecting 1 of them.
+            Iterator<CollisionJobContext> iter = collisioinSpi.jobs.values().iterator();
+
+            iter.next().cancel();
+
+            assertEquals(1, rejected.value());
+
+            Thread.sleep(100); // Sleeping to make sure totalWaitingTime will become more the zero.
+
+            iter.next().activate();
+            iter.next().activate();
+
+            boolean res = waitForCondition(() -> active.value() > 0, TIMEOUT);
+
+            assertTrue(res);
+            assertTrue("Waiting time should be greater then zero.", totalWaitingTime.value() > 0);
+
+            Thread.sleep(100); // Sleeping to make sure totalExecutionTime will become more the zero.
+
+            latch.countDown();
+
+            res = waitForCondition(() -> fut1.isDone() && fut2.isDone() && fut3.isDone(), TIMEOUT);
+
+            assertTrue(res);
+
+            res = waitForCondition(() -> finished.value() == 3, TIMEOUT);
+
+            assertTrue(res);
+
+            assertTrue("Execution time should be greater then zero.", totalExecutionTime.value() > 0);
+        }
+    }
+
+    /** Test correct calculation of finished, started, active, canceled metrics of the {@link GridJobProcessor}. */
+    @Test
+    public void testGridJobMetrics() throws Exception {
+        latch = new CountDownLatch(1);
+
+        try(IgniteEx g = startGrid(0)) {
+            ReadOnlyMetricRegistry mreg = g.context().metric().registry().withPrefix(JOBS);
+
+            LongMetric started = (LongMetric)mreg.findMetric(STARTED);
+            LongMetric active = (LongMetric)mreg.findMetric(ACTIVE);
+            LongMetric waiting = (LongMetric)mreg.findMetric(WAITING);
+            LongMetric canceled = (LongMetric)mreg.findMetric(CANCELED);
+            LongMetric rejected = (LongMetric)mreg.findMetric(REJECTED);
+            LongMetric finished = (LongMetric)mreg.findMetric(FINISHED);
+            LongMetric totalExecutionTime = (LongMetric)mreg.findMetric(EXECUTION_TIME);
+            LongMetric totalWaitingTime = (LongMetric)mreg.findMetric(WAITING_TIME);
+
+            assertNotNull(started);
+            assertNotNull(active);
+            assertNotNull(waiting);
+            assertNotNull(canceled);
+            assertNotNull(rejected);
+            assertNotNull(finished);
+            assertNotNull(totalExecutionTime);
+            assertNotNull(totalWaitingTime);
+
+            assertEquals(0, started.value());
+            assertEquals(0, active.value());
+            assertEquals(0, waiting.value());
+            assertEquals(0, canceled.value());
+            assertEquals(0, rejected.value());
+            assertEquals(0, finished.value());
+            assertEquals(0, totalExecutionTime.value());
+            assertEquals(0, totalWaitingTime.value());
+
+            SimplestTask task = new SimplestTask();
+
+            g.compute().execute(task, 1);
+
+            // Waiting task to finish.
+            boolean res = waitForCondition(() -> active.value() == 0, TIMEOUT);
+
+            assertTrue("Active = " + active.value(), res);
+
+            assertEquals(1, started.value());
+            assertEquals(0, waiting.value());
+            assertEquals(0, canceled.value());
+            assertEquals(0, rejected.value());
+            assertEquals(1, finished.value());
+
+            // Task should block until latch is down.
+            task.block = true;
+
+            ComputeTaskFuture<?> fut = g.compute().executeAsync(task, 1);
+
+            // Waiting task to start execution.
+            res = waitForCondition(() -> active.value() == 1, TIMEOUT);
+
+            assertTrue("Active = " + active.value(), res);
+
+            assertEquals(2, started.value());
+            assertEquals(0, waiting.value());
+            assertEquals(0, canceled.value());
+            assertEquals(0, rejected.value());
+            assertEquals(1, finished.value());
+
+            Thread.sleep(100); // Sleeping to make sure totalExecutionTime will become more the zero.
+
+            // After latch is down, task should finish.
+            latch.countDown();
+
+            fut.get(TIMEOUT);
+
+            res = waitForCondition(() -> active.value() == 0, TIMEOUT);
+
+            assertTrue("Active = " + active.value(), res);
+            assertTrue("Execution time should be greater then zero.", totalExecutionTime.value() > 0);
+
+            assertEquals(2, finished.value());
+
+            latch = new CountDownLatch(1);
+
+            fut = g.compute().executeAsync(task, 1);
+
+            res = waitForCondition(() -> active.value() == 1, TIMEOUT);
+
+            assertTrue("Active = " + active.value(), res);
+
+            assertEquals(3, started.value());
+            assertEquals(0, waiting.value());
+            assertEquals(0, canceled.value());
+            assertEquals(0, rejected.value());
+            assertEquals(2, finished.value());
+
+            // First cancel task, then allow it to finish.
+            fut.cancel();
+
+            latch.countDown();
+
+            res = waitForCondition(() -> active.value() == 0, TIMEOUT);
+
+            assertTrue("Active = " + active.value(), res);
+
+            assertEquals(3, started.value());
+            assertEquals(0, waiting.value());
+            assertEquals(1, canceled.value());
+            assertEquals(0, rejected.value());
+
+            res = waitForCondition(() -> finished.value() == 3, TIMEOUT);
+
+            assertTrue("Finished = " + finished.value(), res);
+        }
+    }
+
+    /** */
+    private static class SimplestJob implements ComputeJob {
+        /** */
+        private final boolean block;
+
+        /** */
+        public SimplestJob(boolean block) {
+            this.block = block;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            if (block) {
+                try {
+                    latch.await();
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            return "1";
+        }
+    }
+
+    /** */
+    private static class SimplestTask extends ComputeTaskAdapter<Object, Object> {
+        /** */
+        volatile boolean block;
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+            @Nullable Object arg) throws IgniteException {
+            Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+
+            for (ClusterNode node : subgrid)
+                jobs.put(new SimplestJob(block), node);
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
+            return "1";
+        }
+    }
+
+    /** */
+    @IgniteSpiMultipleInstancesSupport(true)
+    public static class GridTestCollision extends IgniteSpiAdapter implements CollisionSpi {
+        /** */
+        HashMap<ComputeJob, CollisionJobContext> jobs = new HashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public void onCollision(CollisionContext ctx) {
+            for (CollisionJobContext jobCtx : ctx.waitingJobs())
+                jobs.put(jobCtx.getJob(), jobCtx);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStart(String igniteInstanceName) throws IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStop() throws IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setExternalCollisionListener(CollisionExternalListener lsnr) {
+            // No-op.
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteJobMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteJobMetricsSelfTestSuite.java
index 818f0de..fe9a2fe 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteJobMetricsSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteJobMetricsSelfTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessorLoadTest;
+import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSelfTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -25,6 +26,9 @@ import org.junit.runners.Suite;
  * Job metrics self test suite.
  */
 @RunWith(Suite.class)
-@Suite.SuiteClasses({GridJobMetricsProcessorLoadTest.class})
+@Suite.SuiteClasses({
+    GridJobMetricsProcessorLoadTest.class,
+    GridJobMetricsSelfTest.class,
+})
 public class IgniteJobMetricsSelfTestSuite {
 }