You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/03/10 17:35:30 UTC

[ignite] branch master updated: IGNITE-12756 TcpCommunication SPI metrics improvement

This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 683f22e  IGNITE-12756 TcpCommunication SPI metrics improvement
683f22e is described below

commit 683f22e64fe57b2de47cfe36cf40b237ce7aaf8e
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue Feb 25 19:55:01 2020 +0300

    IGNITE-12756 TcpCommunication SPI metrics improvement
---
 .../internal/managers/GridManagerAdapter.java      |  18 +++
 .../managers/communication/GridIoManager.java      |  94 ++++++-------
 .../communication/IgniteMessageFactoryImpl.java    |  37 ++++-
 .../processors/resource/GridResourceIoc.java       |  19 +--
 .../processors/resource/GridResourceProcessor.java |   9 +-
 .../internal/resources/MetricManagerResource.java  |  32 -----
 .../org/apache/ignite/spi/IgniteSpiAdapter.java    |  33 ++++-
 .../org/apache/ignite/spi/IgniteSpiContext.java    |  36 +++++
 .../tcp/TcpCommunicationMetricsListener.java       | 156 ++++++++++++---------
 .../spi/communication/tcp/TcpCommunicationSpi.java |  29 ++--
 .../IgniteMessageFactoryImplTest.java              |  47 ++++++-
 .../ignite/testframework/GridSpiTestContext.java   |  30 +++-
 .../testframework/junits/IgniteTestResources.java  |  16 +--
 13 files changed, 357 insertions(+), 199 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index d2952d9..d10d774 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Consumer;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.expiry.TouchedExpiryPolicy;
@@ -62,6 +63,7 @@ import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -624,6 +626,22 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         ctx.discovery().resolveCommunicationError(node, err);
                     }
 
+                    @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
+                        return ctx.metric().registry(name);
+                    }
+
+                    @Override public void removeMetricRegistry(String name) {
+                        ctx.metric().remove(name);
+                    }
+
+                    @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
+                        return ctx.metric();
+                    }
+
+                    @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
+                        ctx.metric().addMetricRegistryCreationListener(lsnr);
+                    }
+
                     /**
                      * @param e Exception to handle.
                      * @return GridSpiException Converted exception.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index b40a166..4ae64b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -416,53 +416,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        startSpi();
-
-        MetricRegistry ioMetric = ctx.metric().registry(COMM_METRICS);
-
-        CommunicationSpi spi = ctx.config().getCommunicationSpi();
-
-        ioMetric.register(OUTBOUND_MSG_QUEUE_CNT, spi::getOutboundMessagesQueueSize,
-                "Outbound messages queue size.");
-
-        ioMetric.register(SENT_MSG_CNT, spi::getSentMessagesCount, "Sent messages count.");
-
-        ioMetric.register(SENT_BYTES_CNT, spi::getSentBytesCount, "Sent bytes count.");
-
-        ioMetric.register(RCVD_MSGS_CNT, spi::getReceivedMessagesCount,
-                "Received messages count.");
-
-        ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
-
-        getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
-            @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
-                try {
-                    onMessage0(nodeId, (GridIoMessage)msg, msgC);
-                }
-                catch (ClassCastException ignored) {
-                    U.error(log, "Communication manager received message of unknown type (will ignore): " +
-                        msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
-                        "which is illegal - make sure to send messages only via GridProjection API.");
-                }
-            }
-
-            @Override public void onDisconnected(UUID nodeId) {
-                for (GridDisconnectListener lsnr : disconnectLsnrs)
-                    lsnr.onNodeDisconnected(nodeId);
-            }
-
-            @Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
-                try {
-                    onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
-                }
-                catch (ClassCastException ignored) {
-                    U.error(log, "Communication manager received message of unknown type (will ignore): " +
-                        initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
-                        "which is illegal - make sure to send messages only via GridProjection API.");
-                }
-            }
-        });
-
         ctx.addNodeAttribute(DIRECT_PROTO_VER_ATTR, DIRECT_PROTO_VER);
 
         MessageFormatter[] formatterExt = ctx.plugins().extensions(MessageFormatter.class);
@@ -512,6 +465,53 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         msgFactory = new IgniteMessageFactoryImpl(msgs);
 
+        startSpi();
+
+        MetricRegistry ioMetric = ctx.metric().registry(COMM_METRICS);
+
+        CommunicationSpi spi = ctx.config().getCommunicationSpi();
+
+        ioMetric.register(OUTBOUND_MSG_QUEUE_CNT, spi::getOutboundMessagesQueueSize,
+                "Outbound messages queue size.");
+
+        ioMetric.register(SENT_MSG_CNT, spi::getSentMessagesCount, "Sent messages count.");
+
+        ioMetric.register(SENT_BYTES_CNT, spi::getSentBytesCount, "Sent bytes count.");
+
+        ioMetric.register(RCVD_MSGS_CNT, spi::getReceivedMessagesCount,
+                "Received messages count.");
+
+        ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
+
+        getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
+            @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
+                try {
+                    onMessage0(nodeId, (GridIoMessage)msg, msgC);
+                }
+                catch (ClassCastException ignored) {
+                    U.error(log, "Communication manager received message of unknown type (will ignore): " +
+                            msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+                            "which is illegal - make sure to send messages only via GridProjection API.");
+                }
+            }
+
+            @Override public void onDisconnected(UUID nodeId) {
+                for (GridDisconnectListener lsnr : disconnectLsnrs)
+                    lsnr.onNodeDisconnected(nodeId);
+            }
+
+            @Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
+                try {
+                    onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
+                }
+                catch (ClassCastException ignored) {
+                    U.error(log, "Communication manager received message of unknown type (will ignore): " +
+                            initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+                            "which is illegal - make sure to send messages only via GridProjection API.");
+                }
+            }
+        });
+
         if (log.isDebugEnabled())
             log.debug(startInfo());
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
index 957ef7c..68ce797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
@@ -45,6 +45,15 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
     /** Initialized flag. If {@code true} then new message type couldn't be registered. */
     private boolean initialized;
 
+    /** Min index of registered message supplier. */
+    private int minIdx = Integer.MAX_VALUE;
+
+    /** Max index of registered message supplier. */
+    private int maxIdx = -1;
+
+    /** Count of registered message suppliers. */
+    private int cnt;
+
     /**
      * Contructor.
      *
@@ -97,15 +106,21 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
 
         Supplier<Message> curr = msgSuppliers[idx];
 
-        if (curr == null)
+        if (curr == null) {
             msgSuppliers[idx] = supplier;
+
+            minIdx = Math.min(idx, minIdx);
+
+            maxIdx = Math.max(idx, maxIdx);
+
+            cnt++;
+        }
         else
             throw new IgniteException("Message factory is already registered for direct type: " + directType);
     }
 
     /**
      * Creates new message instance of provided direct type.
-     * <p>
      *
      * @param directType Message direct type.
      * @return Message instance.
@@ -121,6 +136,24 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
     }
 
     /**
+     * Returns direct types of all registered messages.
+     *
+     * @return Direct types of all registered messages.
+     */
+    public short[] registeredDirectTypes() {
+        short[] res = new short[cnt];
+
+        if (cnt > 0) {
+            for (int i = minIdx, p = 0; i <= maxIdx; i++) {
+                if (msgSuppliers[i] != null)
+                    res[p++] = indexToDirectType(i);
+            }
+        }
+
+        return res;
+    }
+
+    /**
      * @param directType Direct type.
      */
     private static int directTypeToIndex(short directType) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index 10af241..af6a75f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
-import org.apache.ignite.internal.resources.MetricManagerResource;
 import org.apache.ignite.internal.util.GridLeanIdentitySet;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -133,7 +132,7 @@ public class GridResourceIoc {
                 break;
 
             if (dep != null) {
-                Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
+                Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.newCSet());
 
                 classes.add(cls);
 
@@ -266,7 +265,7 @@ public class GridResourceIoc {
 
             boolean allowImplicitInjection = !GridNoImplicitInjection.class.isAssignableFrom(cls);
 
-            for (Class cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
+            for (Class<?> cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
                 for (Field field : cls0.getDeclaredFields()) {
                     Annotation[] fieldAnns = field.getAnnotations();
 
@@ -274,9 +273,7 @@ public class GridResourceIoc {
                         T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
 
                         if (t2 == null) {
-                            t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
-                                new ArrayList<GridResourceField>(),
-                                new ArrayList<GridResourceMethod>());
+                            t2 = new T2<>(new ArrayList<>(), new ArrayList<>());
 
                             annMap.put(ann.annotationType(), t2);
                         }
@@ -299,9 +296,7 @@ public class GridResourceIoc {
                         T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
 
                         if (t2 == null) {
-                            t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
-                                new ArrayList<GridResourceField>(),
-                                new ArrayList<GridResourceMethod>());
+                            t2 = new T2<>(new ArrayList<>(), new ArrayList<>());
 
                             annMap.put(ann.annotationType(), t2);
                         }
@@ -515,9 +510,6 @@ public class GridResourceIoc {
         CACHE_STORE_SESSION(CacheStoreSessionResource.class),
 
         /** */
-        METRIC_MANAGER(MetricManagerResource.class),
-
-        /** */
         FILESYSTEM_RESOURCE(FileSystemResource.class);
 
         /** */
@@ -541,8 +533,7 @@ public class GridResourceIoc {
             ResourceAnnotation.SPRING,
             ResourceAnnotation.IGNITE_INSTANCE,
             ResourceAnnotation.LOGGER,
-            ResourceAnnotation.SERVICE,
-            ResourceAnnotation.METRIC_MANAGER
+            ResourceAnnotation.SERVICE
         ),
 
         /** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index 07cd319..fed25c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -74,8 +74,6 @@ public class GridResourceProcessor extends GridProcessorAdapter {
             new GridResourceLoggerInjector(ctx.config().getGridLogger());
         injectorByAnnotation[GridResourceIoc.ResourceAnnotation.IGNITE_INSTANCE.ordinal()] =
             new GridResourceBasicInjector<>(ctx.grid());
-        injectorByAnnotation[GridResourceIoc.ResourceAnnotation.METRIC_MANAGER.ordinal()] =
-            new GridResourceSupplierInjector<>(ctx::metric);
     }
 
     /** {@inheritDoc} */
@@ -336,6 +334,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
             case LOAD_BALANCER:
             case TASK_CONTINUOUS_MAPPER:
             case CACHE_STORE_SESSION:
+            case FILESYSTEM_RESOURCE:
                 res = new GridResourceBasicInjector<>(param);
                 break;
 
@@ -343,10 +342,6 @@ public class GridResourceProcessor extends GridProcessorAdapter {
                 res = new GridResourceJobContextInjector((ComputeJobContext)param);
                 break;
 
-            case FILESYSTEM_RESOURCE:
-                res = new GridResourceBasicInjector<>(param);
-                break;
-
             default:
                 res = injectorByAnnotation[ann.ordinal()];
                 break;
@@ -411,7 +406,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
         injectToJob(dep, taskCls, obj, ses, jobCtx);
 
         if (obj instanceof GridInternalWrapper) {
-            Object usrObj = ((GridInternalWrapper)obj).userObject();
+            Object usrObj = ((GridInternalWrapper<?>)obj).userObject();
 
             if (usrObj != null)
                 injectToJob(dep, taskCls, usrObj, ses, jobCtx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java b/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java
deleted file mode 100644
index 21cddd4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.resources;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/** */
-@Documented
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.METHOD, ElementType.FIELD})
-public @interface MetricManagerResource {
-    // No-op.
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index f378012..9fa83c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -34,6 +35,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -54,9 +56,11 @@ import org.apache.ignite.plugin.security.SecuritySubject;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 
 /**
@@ -664,8 +668,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
                     "'IgniteConfiguration.metricsUpdateFrequency' to prevent unnecessary status checking.");
         }
         // Intentionally compare references using '!=' below
-        else if (ignite.configuration().getFailureDetectionTimeout() !=
-                IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT)
+        else if (ignite.configuration().getFailureDetectionTimeout() != DFLT_FAILURE_DETECTION_TIMEOUT)
             log.warning("Failure detection timeout will be ignored (one of SPI parameters has been set explicitly)");
 
         clientFailureDetectionTimeout = ignite.configuration().getClientFailureDetectionTimeout();
@@ -841,7 +844,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
 
         /** {@inheritDoc} */
         @Override public Collection<ClusterNode> nodes() {
-            return  locNode == null  ? Collections.<ClusterNode>emptyList() : Collections.singletonList(locNode);
+            return  locNode == null  ? Collections.emptyList() : Collections.singletonList(locNode);
         }
 
         /** {@inheritDoc} */
@@ -946,7 +949,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
             if (!(ignite0 instanceof IgniteKernal))
                 throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);
 
-            ((IgniteKernal)ignite0).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+            ((IgniteEx)ignite0).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
         }
 
         /** {@inheritDoc} */
@@ -956,7 +959,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
             if (!(ignite0 instanceof IgniteKernal))
                 throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);
 
-            ((IgniteKernal)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+            ((IgniteEx)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
         }
 
         /** {@inheritDoc} */
@@ -973,5 +976,25 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
         @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
             throw new UnsupportedOperationException();
         }
+
+        /** {@inheritDoc} */
+        @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeMetricRegistry(String name) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
+            // No-op.
+        }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index d4402f4..9c08aca 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Consumer;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -28,10 +29,12 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.security.SecuritySubject;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -376,4 +379,37 @@ public interface IgniteSpiContext {
      * @param err Error.
      */
     public void resolveCommunicationFailure(ClusterNode node, Exception err);
+
+    /**
+     * Returns exisiting or newly created instance of metric registry with given name.
+     *
+     * @param name Metric registry name.
+     * @return Exisiting or newly created instance of metric registry.
+     */
+    @IgniteExperimental
+    public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name);
+
+    /**
+     * Removes metric registry with given name.
+     *
+     * @param name Metric registry name.
+     */
+    @IgniteExperimental
+    public void removeMetricRegistry(String name);
+
+    /**
+     * Returns all registered metric registries.
+     *
+     * @return All registered metric registries.
+     */
+    @IgniteExperimental
+    public Iterable<ReadOnlyMetricRegistry> metricRegistries();
+
+    /**
+     * Register listener which will be notified on metric registry creation.
+     *
+     * @param lsnr Listener.
+     */
+    @IgniteExperimental
+    public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index afece1f..86608fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -27,10 +27,15 @@ import java.util.function.Function;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiContext;
 import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.spi.metric.Metric;
 import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
@@ -60,15 +65,15 @@ import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_M
  * Statistics for {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi}.
  */
 class TcpCommunicationMetricsListener {
-    /** Metrics manager. */
-    private final GridMetricManager mmgr;
-
-    /** Metrics registry. */
-    private final org.apache.ignite.internal.processors.metric.MetricRegistry mreg;
+    /** SPI context. */
+    private final IgniteSpiContext spiCtx;
 
     /** Current ignite instance. */
     private final Ignite ignite;
 
+    /** Metrics registry. */
+    private final MetricRegistry mreg;
+
     /** All registered metrics. */
     private final Set<ThreadMetrics> allMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
@@ -81,12 +86,6 @@ class TcpCommunicationMetricsListener {
         return metrics;
     });
 
-    /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code sentMsgsMetricsByType}. */
-    private final Function<Short, LongAdderMetric> sentMsgsCntByTypeMetricFactory;
-
-    /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code rcvdMsgsMetricsByType}. */
-    private final Function<Short, LongAdderMetric> rcvdMsgsCntByTypeMetricFactory;
-
     /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code #sentMsgsMetricsByConsistentId}. */
     private final Function<Object, LongAdderMetric> sentMsgsCntByConsistentIdMetricFactory;
 
@@ -105,35 +104,37 @@ class TcpCommunicationMetricsListener {
     /** Received messages count metric. */
     private final LongAdderMetric rcvdMsgsMetric;
 
+    /** Counters of sent and received messages by direct type. */
+    private final IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> msgCntrsByType;
+
     /** Method to synchronize access to message type map. */
-    private final Object msgTypMapMux = new Object();
+    private final Object msgTypeMapMux = new Object();
 
     /** Message type map. */
     private volatile Map<Short, String> msgTypeMap;
 
     /** */
-    public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) {
-        this.mmgr = mmgr;
+    public TcpCommunicationMetricsListener(Ignite ignite, IgniteSpiContext spiCtx) {
         this.ignite = ignite;
+        this.spiCtx = spiCtx;
+
+        mreg = (MetricRegistry)spiCtx.getOrCreateMetricRegistry(COMMUNICATION_METRICS_GROUP_NAME);
 
-        mreg = mmgr.registry(COMMUNICATION_METRICS_GROUP_NAME);
+        msgCntrsByType = createMessageCounters((IgniteMessageFactory)spiCtx.messageFactory());
 
-        sentMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric(
-            sentMessagesByTypeMetricName(directType),
-            SENT_MESSAGES_BY_TYPE_METRIC_DESC
-        );
-        rcvdMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric(
-            receivedMessagesByTypeMetricName(directType),
-            RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC
-        );
+        sentMsgsCntByConsistentIdMetricFactory = consistentId -> {
+            String name = metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString());
 
-        sentMsgsCntByConsistentIdMetricFactory = consistentId ->
-            mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()))
-                .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
+            return spiCtx.getOrCreateMetricRegistry(name)
+                    .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
+        };
 
-        rcvdMsgsCntByConsistentIdMetricFactory = consistentId ->
-            mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()))
-                .findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
+        rcvdMsgsCntByConsistentIdMetricFactory = consistentId -> {
+            String name = metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString());
+
+            return spiCtx.getOrCreateMetricRegistry(name)
+                    .findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
+        };
 
         sentBytesMetric = mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC);
         rcvdBytesMetric = mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC);
@@ -141,17 +142,49 @@ class TcpCommunicationMetricsListener {
         sentMsgsMetric = mreg.longAdderMetric(SENT_MESSAGES_METRIC_NAME, SENT_MESSAGES_METRIC_DESC);
         rcvdMsgsMetric = mreg.longAdderMetric(RECEIVED_MESSAGES_METRIC_NAME, RECEIVED_MESSAGES_METRIC_DESC);
 
-        mmgr.addMetricRegistryCreationListener(mreg -> {
+        spiCtx.addMetricRegistryCreationListener(mreg -> {
             // Metrics for the specific nodes.
             if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR))
                 return;
 
-            ((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
+            ((MetricRegistry)mreg).longAdderMetric(
+                    SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
+                    SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
+            );
 
-            ((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
+            ((MetricRegistry)mreg).longAdderMetric(
+                    RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
+                    RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
+            );
         });
     }
 
+    /**
+     * Creates counters of sent and received messages by direct type.
+     *
+     * @param factory Message factory.
+     * @return Counters of sent and received messages grouped by direct type.
+     */
+    private IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> createMessageCounters(IgniteMessageFactory factory) {
+        IgniteMessageFactoryImpl msgFactory = (IgniteMessageFactoryImpl)factory;
+
+        short[] directTypes = msgFactory.registeredDirectTypes();
+
+        IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> msgCntrsByType = new IntHashMap<>(directTypes.length);
+
+        for (short type : directTypes) {
+            LongAdderMetric sentCnt =
+                    mreg.longAdderMetric(sentMessagesByTypeMetricName(type), SENT_MESSAGES_BY_TYPE_METRIC_DESC);
+
+            LongAdderMetric rcvCnt =
+                    mreg.longAdderMetric(receivedMessagesByTypeMetricName(type), RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC);
+
+            msgCntrsByType.put(type, new IgniteBiTuple<>(sentCnt, rcvCnt));
+        }
+
+        return msgCntrsByType;
+    }
+
     /** Metrics registry. */
     public MetricRegistry metricRegistry() {
         return mreg;
@@ -285,10 +318,10 @@ class TcpCommunicationMetricsListener {
             if (metric.name().startsWith(prefix)) {
                 short directType = Short.parseShort(metric.name().substring(prefix.length()));
 
-                Map<Short, String> msgTypMap0 = msgTypeMap;
+                Map<Short, String> msgTypeMap0 = msgTypeMap;
 
-                if (msgTypMap0 != null) {
-                    String typeName = msgTypMap0.get(directType);
+                if (msgTypeMap0 != null) {
+                    String typeName = msgTypeMap0.get(directType);
 
                     if (typeName != null)
                         res.put(typeName, ((LongMetric)metric).value());
@@ -309,7 +342,7 @@ class TcpCommunicationMetricsListener {
 
         String mregPrefix = COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR;
 
-        for (ReadOnlyMetricRegistry mreg : mmgr) {
+        for (ReadOnlyMetricRegistry mreg : spiCtx.metricRegistries()) {
             if (mreg.name().startsWith(mregPrefix)) {
                 String nodeConsIdStr = mreg.name().substring(mregPrefix.length());
 
@@ -342,7 +375,7 @@ class TcpCommunicationMetricsListener {
                 metric.reset();
         }
 
-        for (ReadOnlyMetricRegistry mreg : mmgr) {
+        for (ReadOnlyMetricRegistry mreg : spiCtx.metricRegistries()) {
             if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR)) {
                 mreg.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
 
@@ -363,7 +396,7 @@ class TcpCommunicationMetricsListener {
             threadMetrics.rcvdMsgsMetricsByConsistentId = new HashMap<>();
         }
 
-        mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()));
+        spiCtx.removeMetricRegistry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()));
     }
 
     /**
@@ -374,24 +407,24 @@ class TcpCommunicationMetricsListener {
     private void updateMessageTypeMap(Message msg) {
         short typeId = msg.directType();
 
-        Map<Short, String> msgTypMap0 = msgTypeMap;
+        Map<Short, String> msgTypeMap0 = msgTypeMap;
 
-        if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
-            synchronized (msgTypMapMux) {
+        if (msgTypeMap0 == null || !msgTypeMap0.containsKey(typeId)) {
+            synchronized (msgTypeMapMux) {
                 if (msgTypeMap == null) {
-                    msgTypMap0 = new HashMap<>();
+                    msgTypeMap0 = new HashMap<>();
 
-                    msgTypMap0.put(typeId, msg.getClass().getName());
+                    msgTypeMap0.put(typeId, msg.getClass().getName());
 
-                    msgTypeMap = msgTypMap0;
+                    msgTypeMap = msgTypeMap0;
                 }
                 else {
                     if (!msgTypeMap.containsKey(typeId)) {
-                        msgTypMap0 = new HashMap<>(msgTypeMap);
+                        msgTypeMap0 = new HashMap<>(msgTypeMap);
 
-                        msgTypMap0.put(typeId, msg.getClass().getName());
+                        msgTypeMap0.put(typeId, msg.getClass().getName());
 
-                        msgTypeMap = msgTypMap0;
+                        msgTypeMap = msgTypeMap0;
                     }
                 }
             }
@@ -412,29 +445,22 @@ class TcpCommunicationMetricsListener {
      * Thread-local metrics.
      */
     private class ThreadMetrics {
-        /** Sent messages count metrics grouped by message type. */
-        private final Map<Short, LongAdderMetric> sentMsgsMetricsByType = new HashMap<>();
-
-        /** Received messages count metrics grouped by message type. */
-        private final Map<Short, LongAdderMetric> rcvdMsgsMetricsByType = new HashMap<>();
+        /** Sent messages count metrics grouped by message node consistent id. */
+        volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<>();
 
-        /**
-         * Sent messages count metrics grouped by message node consistent id.
-         */
-        public volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<>();
-
-        /**
-         * Received messages metrics count grouped by message node consistent id.
-         */
-        public volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<>();
+        /** Received messages metrics count grouped by message node consistent id. */
+        volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<>();
 
         /**
          * Collects statistics for message sent by SPI.
+         *
          * @param msg Sent message.
          * @param consistentId Receiver node consistent id.
          */
         private void onMessageSent(Message msg, Object consistentId) {
-            sentMsgsMetricsByType.computeIfAbsent(msg.directType(), sentMsgsCntByTypeMetricFactory).increment();
+            IgniteBiTuple<LongAdderMetric, LongAdderMetric> cnts = msgCntrsByType.get(msg.directType());
+
+            cnts.get1().increment();
 
             sentMsgsMetricsByConsistentId.computeIfAbsent(consistentId, sentMsgsCntByConsistentIdMetricFactory).increment();
         }
@@ -445,7 +471,9 @@ class TcpCommunicationMetricsListener {
          * @param consistentId Sender node consistent id.
          */
         private void onMessageReceived(Message msg, Object consistentId) {
-            rcvdMsgsMetricsByType.computeIfAbsent(msg.directType(), rcvdMsgsCntByTypeMetricFactory).increment();
+            IgniteBiTuple<LongAdderMetric, LongAdderMetric> cnts = msgCntrsByType.get(msg.directType());
+
+            cnts.get2().increment();
 
             rcvdMsgsMetricsByConsistentId.computeIfAbsent(consistentId, rcvdMsgsCntByConsistentIdMetricFactory).increment();
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index ce324ca..7708789 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -75,9 +75,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
-import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
-import org.apache.ignite.internal.resources.MetricManagerResource;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -1399,13 +1397,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         }
     }
 
-    /** */
-    @MetricManagerResource
-    private void injectMetricManager(GridMetricManager mmgr) {
-        if (mmgr != null)
-            metricsLsnr = new TcpCommunicationMetricsListener(mmgr, ignite);
-    }
-
     /**
      * Sets local host address for socket binding. Note that one node could have
      * additional addresses beside the loopback one. This configuration
@@ -2039,21 +2030,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
     /** {@inheritDoc} */
     @Override public int getSentMessagesCount() {
+        // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
+        if (metricsLsnr == null)
+            return 0;
+
         return metricsLsnr.sentMessagesCount();
     }
 
     /** {@inheritDoc} */
     @Override public long getSentBytesCount() {
+        // Listener could be not initialized yet, but discovery thread clould try to aggregate metrics.
+        if (metricsLsnr == null)
+            return 0;
+
         return metricsLsnr.sentBytesCount();
     }
 
     /** {@inheritDoc} */
     @Override public int getReceivedMessagesCount() {
+        // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
+        if (metricsLsnr == null)
+            return 0;
+
         return metricsLsnr.receivedMessagesCount();
     }
 
     /** {@inheritDoc} */
     @Override public long getReceivedBytesCount() {
+        // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
+        if (metricsLsnr == null)
+            return 0;
+
         return metricsLsnr.receivedBytesCount();
     }
 
@@ -2430,6 +2437,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         ctxInitLatch.countDown();
+
+        metricsLsnr = new TcpCommunicationMetricsListener(ignite, spiCtx);
     }
 
     /** {@inheritDoc} */
@@ -5048,7 +5057,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /**
      *
      */
-    private class ConnectGateway {
+    private static class ConnectGateway {
         /** */
         private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
index c55cc0d..4ad4040 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -41,6 +42,9 @@ public class IgniteMessageFactoryImplTest {
     /** Test message 2 type. */
     private static final short TEST_MSG_2_TYPE = 2;
 
+    /** Test message 42 type. */
+    private static final short TEST_MSG_42_TYPE = 42;
+
     /** Unknown message type. */
     private static final short UNKNOWN_MSG_TYPE = 0;
 
@@ -63,7 +67,7 @@ public class IgniteMessageFactoryImplTest {
     public void testCreate() {
         MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
 
-        IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
+        IgniteMessageFactoryImpl msgFactory = new IgniteMessageFactoryImpl(factories);
 
         Message msg;
 
@@ -73,8 +77,12 @@ public class IgniteMessageFactoryImplTest {
         msg = msgFactory.create(TEST_MSG_2_TYPE);
         assertTrue(msg instanceof TestMessage2);
 
-        msg = msgFactory.create(TEST_MSG_2_TYPE);
-        assertTrue(msg instanceof TestMessage2);
+        msg = msgFactory.create(TEST_MSG_42_TYPE);
+        assertTrue(msg instanceof TestMessage42);
+
+        short[] directTypes = msgFactory.registeredDirectTypes();
+
+        assertArrayEquals(directTypes, new short[] {TEST_MSG_1_TYPE, TEST_MSG_2_TYPE, TEST_MSG_42_TYPE});
     }
 
     /**
@@ -111,6 +119,7 @@ public class IgniteMessageFactoryImplTest {
         /** {@inheritDoc} */
         @Override public void registerAll(IgniteMessageFactory factory) {
             factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
+            factory.register(TEST_MSG_42_TYPE, TestMessage42::new);
         }
     }
 
@@ -154,7 +163,7 @@ public class IgniteMessageFactoryImplTest {
 
         /** {@inheritDoc} */
         @Override public short directType() {
-            return 1;
+            return TEST_MSG_1_TYPE;
         }
 
         /** {@inheritDoc} */
@@ -182,7 +191,35 @@ public class IgniteMessageFactoryImplTest {
 
         /** {@inheritDoc} */
         @Override public short directType() {
-            return 2;
+            return TEST_MSG_2_TYPE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte fieldsCount() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAckReceived() {
+            // No-op.
+        }
+    }
+
+    /** Test message. */
+    private static class TestMessage42 implements Message {
+        /** {@inheritDoc} */
+        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public short directType() {
+            return TEST_MSG_42_TYPE;
         }
 
         /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index fdea2b0..a15d7a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -45,12 +46,14 @@ import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -62,6 +65,7 @@ import org.apache.ignite.spi.IgniteSpiContext;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -90,7 +94,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
     private final Map<ClusterNode, Serializable> sentMsgs = new HashMap<>();
 
     /** */
-    private final ConcurrentMap<String, Map> cache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Map<?, ?>> cache = new ConcurrentHashMap<>();
 
     /** */
     private MessageFormatter formatter;
@@ -109,6 +113,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     @Override public Collection<ClusterNode> remoteNodes() {
         return rmtNodes;
     }
@@ -611,14 +616,33 @@ public class GridSpiTestContext implements IgniteSpiContext {
         throw new UnsupportedOperationException();
     }
 
+    /** {@inheritDoc} */
+    @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
+        return new MetricRegistry(name, null, null, new NullLogger());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeMetricRegistry(String name) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
+        // No-op.
+    }
+
     /**
      * @param cacheName Cache name.
      * @return Map representing cache.
      */
-    @SuppressWarnings("unchecked")
     private <K, V> Map<K, V> getOrCreateCache(String cacheName) {
         synchronized (cache) {
-            Map<K, V> map = cache.get(cacheName);
+            Map<K, V> map = (Map<K, V>)cache.get(cacheName);
 
             if (map == null)
                 cache.put(cacheName, map = new ConcurrentHashMap<>());
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
index a154fe5..f935e05 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
-import org.apache.ignite.internal.resources.MetricManagerResource;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.logger.NullLogger;
@@ -88,10 +87,8 @@ public class IgniteTestResources {
         return U.IGNITE_MBEANS_DISABLED ? null : ManagementFactory.getPlatformMBeanServer();
     }
 
-    /**
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgniteTestResources() throws IgniteCheckedException {
+    /** */
+    public IgniteTestResources() {
         if (SensitiveInfoTestLoggerProxy.TEST_SENSITIVE)
             log = new SensitiveInfoTestLoggerProxy(rootLog.getLogger(getClass()), null, null, null);
         else
@@ -107,7 +104,7 @@ public class IgniteTestResources {
     /**
      * @param cfg Ignite configuration
      */
-    public IgniteTestResources(IgniteConfiguration cfg) throws IgniteCheckedException {
+    public IgniteTestResources(IgniteConfiguration cfg) {
         this.cfg = cfg;
         this.log = rootLog.getLogger(getClass());
         this.jmx = prepareMBeanServer();
@@ -118,7 +115,7 @@ public class IgniteTestResources {
     /**
      * @param jmx JMX server.
      */
-    public IgniteTestResources(MBeanServer jmx) throws IgniteCheckedException {
+    public IgniteTestResources(MBeanServer jmx) {
         assert jmx != null;
 
         this.jmx = jmx;
@@ -130,7 +127,7 @@ public class IgniteTestResources {
     /**
      * @param log Logger.
      */
-    public IgniteTestResources(IgniteLogger log) throws IgniteCheckedException {
+    public IgniteTestResources(IgniteLogger log) {
         assert log != null;
 
         this.log = log.getLogger(getClass());
@@ -167,7 +164,7 @@ public class IgniteTestResources {
      */
     public void startThreads(boolean prestart) {
         execSvc = new IgniteThreadPoolExecutor(nodeId.toString(), null, 40, 40, Long.MAX_VALUE,
-            new LinkedBlockingQueue<Runnable>());
+                new LinkedBlockingQueue<>());
 
         // Improve concurrency for testing.
         if (prestart)
@@ -194,7 +191,6 @@ public class IgniteTestResources {
         rsrcProc.injectBasicResource(target, LoggerResource.class, getLogger().getLogger(target.getClass()));
         rsrcProc.injectBasicResource(target, IgniteInstanceResource.class,
             new IgniteMock(null, locHost, nodeId, getMarshaller(), jmx, home, cfg));
-        rsrcProc.injectBasicResource(target, MetricManagerResource.class, ctx.metric());
     }
 
     /**